From 5bc1d1329112a314d0de5ea269374c14d873b1a6 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 13:54:52 -0300 Subject: [PATCH 01/18] chore: add pypdf dependency for PDF document support --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index f8fe77f..94290f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "opentelemetry-exporter-otlp>=1.25.0", "slowapi>=0.1.9", "python-json-logger>=2.0.0", + "pypdf>=4.0.0", ] [project.optional-dependencies] From 84c268fa4a3d02882566abbe6e1e60aa22053c38 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 13:58:09 -0300 Subject: [PATCH 02/18] feat: compile StateGraph with MemorySaver and update AgentState with history and tenant_id --- src/app/graph/state.py | 3 +++ src/app/graph/workflow.py | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/app/graph/state.py b/src/app/graph/state.py index 1c92451..57b9b21 100644 --- a/src/app/graph/state.py +++ b/src/app/graph/state.py @@ -27,3 +27,6 @@ class AgentState(TypedDict): api_key: NotRequired[str | None] provider: NotRequired[str | None] error: NotRequired[str | None] + tenant_id: NotRequired[str | None] + history: NotRequired[list[dict[str, str]] | None] + diff --git a/src/app/graph/workflow.py b/src/app/graph/workflow.py index c967b86..ebe2078 100644 --- a/src/app/graph/workflow.py +++ b/src/app/graph/workflow.py @@ -14,6 +14,7 @@ rewrite_query ──[retries >= max]─→ fallback → END """ +from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, StateGraph from src.app.core.config import settings @@ -28,6 +29,7 @@ from src.app.graph.router import router_node from src.app.graph.state import AgentState + # ── Conditional edge functions ────────────────────────────────────────── @@ -122,4 +124,4 @@ def get_workflow(): workflow.add_edge("direct_response", END) workflow.add_edge("fallback", END) - return workflow.compile() + return workflow.compile(checkpointer=MemorySaver()) From 0916adea2c1c58f14d58b055e2d7ebe7233c61ed Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 13:59:38 -0300 Subject: [PATCH 03/18] feat: partition VectorStoreService by tenant using dynamic collections --- src/app/services/vector_store.py | 46 ++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/src/app/services/vector_store.py b/src/app/services/vector_store.py index b628007..43c2e52 100644 --- a/src/app/services/vector_store.py +++ b/src/app/services/vector_store.py @@ -15,35 +15,41 @@ def __init__(self): model_name="sentence-transformers/all-MiniLM-L6-v2" ) self.persist_directory = settings.chroma_persist_directory - - # Initialize the Chroma DB os.makedirs(self.persist_directory, exist_ok=True) - self.vector_store = Chroma( - collection_name="vortex_kb", - embedding_function=self.embeddings, - persist_directory=self.persist_directory, - ) + self._collections: dict[str, Chroma] = {} - def search(self, query: str, k: int = 4) -> list[Document]: + def _get_collection(self, tenant_id: str | None = None) -> Chroma: + collection_name = f"vortex_kb_{tenant_id}" if tenant_id else "vortex_kb" + if collection_name not in self._collections: + self._collections[collection_name] = Chroma( + collection_name=collection_name, + embedding_function=self.embeddings, + persist_directory=self.persist_directory, + ) + return self._collections[collection_name] + + def search(self, query: str, k: int = 4, tenant_id: str | None = None) -> list[Document]: """Search the vector store for relevant documents.""" - return self.vector_store.similarity_search(query, k=k) + db = self._get_collection(tenant_id) + return db.similarity_search(query, k=k) - def add_documents(self, documents: list[Document]): + def add_documents(self, documents: list[Document], tenant_id: str | None = None): """Add documents to the vector store.""" - self.vector_store.add_documents(documents) + db = self._get_collection(tenant_id) + db.add_documents(documents) - def document_count(self) -> int: + def document_count(self, tenant_id: str | None = None) -> int: """Return the total number of documents in the collection.""" - return self.vector_store._collection.count() + db = self._get_collection(tenant_id) + return db._collection.count() - def clear(self): + def clear(self, tenant_id: str | None = None): """Delete all documents from the collection and re-initialize.""" - self.vector_store.delete_collection() - self.vector_store = Chroma( - collection_name="vortex_kb", - embedding_function=self.embeddings, - persist_directory=self.persist_directory, - ) + db = self._get_collection(tenant_id) + db.delete_collection() + collection_name = f"vortex_kb_{tenant_id}" if tenant_id else "vortex_kb" + if collection_name in self._collections: + del self._collections[collection_name] # ── Lazy singleton ────────────────────────────────────────────────────── From a0eec09d20ee9a1071449cc09431c7c3824ea9be Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:00:05 -0300 Subject: [PATCH 04/18] feat: isolate semantic cache lookup and updates by tenant_id --- src/app/services/cache.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/app/services/cache.py b/src/app/services/cache.py index 4c32979..26d612e 100644 --- a/src/app/services/cache.py +++ b/src/app/services/cache.py @@ -34,7 +34,7 @@ def __init__(self): ) self.distance_threshold = settings.semantic_cache_threshold - def lookup(self, query: str, provider: str | None = None) -> dict | None: + def lookup(self, query: str, provider: str | None = None, tenant_id: str | None = None) -> dict | None: """ Lookup a query in the cache. Matches semantically similar queries with cosine distance below the configured threshold. @@ -42,7 +42,10 @@ def lookup(self, query: str, provider: str | None = None) -> dict | None: with CacheTimer() as timer: try: active_provider = provider or settings.llm_provider - filter_dict = {"provider": active_provider} + filter_dict = { + "provider": active_provider, + "tenant_id": tenant_id or "default", + } results = self.vector_store.similarity_search_with_score( query, k=1, filter=filter_dict @@ -110,6 +113,7 @@ def update( steps: list, route: str | None, provider: str | None = None, + tenant_id: str | None = None, ): """Store a successful query-response pair in the semantic cache.""" try: @@ -122,13 +126,15 @@ def update( "steps": json.dumps(steps), "route": route or "", "provider": active_provider, + "tenant_id": tenant_id or "default", }, ) self.vector_store.add_documents([doc]) logger.info( - "Stored query in semantic cache: '%s' [provider: %s]", + "Stored query in semantic cache: '%s' [provider: %s, tenant: %s]", query, active_provider, + tenant_id or "default", ) except Exception as e: logger.error("Failed to update semantic cache: %s", e, exc_info=True) From e5b26e61afbd2bb8061cc653560b2609fc2319b4 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:01:13 -0300 Subject: [PATCH 05/18] feat: refactor all graph nodes to be async with history support and tag tracking --- src/app/graph/nodes.py | 124 +++++++++++++++++++++++++++++----------- src/app/graph/router.py | 11 +++- 2 files changed, 101 insertions(+), 34 deletions(-) diff --git a/src/app/graph/nodes.py b/src/app/graph/nodes.py index c818c58..b09c16a 100644 --- a/src/app/graph/nodes.py +++ b/src/app/graph/nodes.py @@ -5,6 +5,7 @@ and returns a partial state update dict that LangGraph merges back. """ +import asyncio import logging from typing import Any @@ -28,15 +29,16 @@ class Grade(BaseModel): # ── Nodes ──────────────────────────────────────────────────────────────── -def retrieve_node(state: AgentState) -> dict[str, Any]: +async def retrieve_node(state: AgentState) -> dict[str, Any]: """Retrieve documents from ChromaDB based on the current question.""" question = state["question"] + tenant_id = state.get("tenant_id") steps = list(state.get("steps", [])) steps.append("retrieve_documents") try: vs = get_vector_store() - docs = vs.search(question) + docs = await asyncio.to_thread(vs.search, question, tenant_id=tenant_id) return {"documents": docs, "question": question, "steps": steps} except Exception as e: logger.error("Retrieve node failed: %s", e, exc_info=True) @@ -48,7 +50,7 @@ def retrieve_node(state: AgentState) -> dict[str, Any]: } -def grade_documents_node(state: AgentState) -> dict[str, Any]: +async def grade_documents_node(state: AgentState) -> dict[str, Any]: """ Evaluate whether retrieved documents are relevant to the question. @@ -88,13 +90,22 @@ def grade_documents_node(state: AgentState) -> dict[str, Any]: retrieval_grader = grade_prompt | structured_llm_grader - filtered_docs = [] - for d in documents: - score = retrieval_grader.invoke( - {"question": question, "document": d.page_content} - ) - if score.binary_score == "yes": # type: ignore[union-attr] - filtered_docs.append(d) + async def _grade_single_doc(doc): + try: + score = await retrieval_grader.ainvoke( + {"question": question, "document": doc.page_content} + ) + if score.binary_score == "yes": # type: ignore[union-attr] + return doc + except Exception as grade_err: + logger.warning("Failed to grade document: %s", grade_err) + return None + + # Concurrently grade all retrieved documents + graded_results = await asyncio.gather( + *(_grade_single_doc(d) for d in documents) + ) + filtered_docs = [d for d in graded_results if d is not None] return {"documents": filtered_docs, "question": question, "steps": steps} except Exception as e: @@ -107,7 +118,7 @@ def grade_documents_node(state: AgentState) -> dict[str, Any]: } -def generate_node(state: AgentState) -> dict[str, Any]: +async def generate_node(state: AgentState) -> dict[str, Any]: """Generate the final answer using RAG context from graded documents.""" question = state["question"] documents = state["documents"] @@ -116,12 +127,20 @@ def generate_node(state: AgentState) -> dict[str, Any]: try: llm = get_llm(api_key=state.get("api_key"), provider=state.get("provider")) + + # Build history string + history = state.get("history") or [] + history_str = "" + for turn in history: + history_str += f"User: {turn.get('question', '')}\nAssistant: {turn.get('generation', '')}\n\n" + prompt = ChatPromptTemplate.from_template( "You are an expert IT infrastructure support " "assistant for the Cortex and Sentinel platforms. " - "Use the following retrieved context to answer " + "Use the following conversation history and retrieved context to answer " "the question. If you don't know the answer, " "say so. Be concise and actionable.\n\n" + "Conversation History:\n{history}\n" "Question: {question}\n" "Context: {context}\n" "Answer:" @@ -130,28 +149,42 @@ def generate_node(state: AgentState) -> dict[str, Any]: docs_content = "\n\n".join(doc.page_content for doc in documents) rag_chain = prompt | llm - generation = rag_chain.invoke({"context": docs_content, "question": question}) + generation = await rag_chain.ainvoke( + {"context": docs_content, "question": question, "history": history_str}, + config={"tags": ["generate_answer"]} + ) + + # Update history + new_history = list(history) + new_history.append({"question": question, "generation": str(generation.content)}) + return { "documents": documents, "question": question, - "generation": generation.content, + "generation": str(generation.content), "steps": steps, + "history": new_history, } except Exception as e: logger.error("Generate node failed: %s", e, exc_info=True) + fallback_gen = ( + "I'm sorry, but I encountered an internal system error while " + "generating the answer. Please try again later or consult " + "the system administrators." + ) + history = state.get("history") or [] + new_history = list(history) + new_history.append({"question": question, "generation": fallback_gen}) return { "documents": documents, "question": question, - "generation": ( - "I'm sorry, but I encountered an internal system error while " - "generating the answer. Please try again later or consult " - "the system administrators." - ), + "generation": fallback_gen, "steps": steps + ["error_fallback"], + "history": new_history, } -def direct_response_node(state: AgentState) -> dict[str, Any]: +async def direct_response_node(state: AgentState) -> dict[str, Any]: """ Handle general queries that don't require RAG retrieval. @@ -164,37 +197,58 @@ def direct_response_node(state: AgentState) -> dict[str, Any]: try: llm = get_llm(api_key=state.get("api_key"), provider=state.get("provider")) + + # Build history string + history = state.get("history") or [] + history_str = "" + for turn in history: + history_str += f"User: {turn.get('question', '')}\nAssistant: {turn.get('generation', '')}\n\n" + prompt = ChatPromptTemplate.from_template( "You are a helpful IT support assistant. Answer the following question " - "directly and concisely.\n\n" + "directly and concisely, taking into account the conversation history if relevant.\n\n" + "Conversation History:\n{history}\n" "Question: {question}\n" "Answer:" ) chain = prompt | llm - generation = chain.invoke({"question": question}) + generation = await chain.ainvoke( + {"question": question, "history": history_str}, + config={"tags": ["generate_answer"]} + ) + + # Update history + new_history = list(history) + new_history.append({"question": question, "generation": str(generation.content)}) return { "question": question, - "generation": generation.content, + "generation": str(generation.content), "documents": [], "steps": steps, + "history": new_history, } except Exception as e: logger.error("Direct response node failed: %s", e, exc_info=True) + fallback_gen = ( + "I'm sorry, but I encountered an internal system error while " + "answering your question. Please try again later or consult " + "the system administrators." + ) + history = state.get("history") or [] + new_history = list(history) + new_history.append({"question": question, "generation": fallback_gen}) return { "question": question, - "generation": ( - "I'm sorry, but I encountered an internal system error while " - "answering your question. Please try again later or consult " - "the system administrators." - ), + "generation": fallback_gen, "documents": [], "steps": steps + ["error_fallback"], + "history": new_history, } -def rewrite_query_node(state: AgentState) -> dict[str, Any]: +async def rewrite_query_node(state: AgentState) -> dict[str, Any]: """ Rewrite the query to produce a better vector search. @@ -234,11 +288,11 @@ def rewrite_query_node(state: AgentState) -> dict[str, Any]: ) question_rewriter = re_write_prompt | llm - better_question = question_rewriter.invoke({"question": question}) + better_question = await question_rewriter.ainvoke({"question": question}) return { "documents": state["documents"], - "question": better_question.content, + "question": str(better_question.content), "retry_count": retry_count + 1, "steps": steps, } @@ -252,7 +306,7 @@ def rewrite_query_node(state: AgentState) -> dict[str, Any]: } -def fallback_node(state: AgentState) -> dict[str, Any]: +async def fallback_node(state: AgentState) -> dict[str, Any]: """ Graceful fallback when the rewrite loop is exhausted or error occurs. @@ -276,9 +330,15 @@ def fallback_node(state: AgentState) -> dict[str, Any]: "consult the system administrators for further assistance." ) + # Append to history + history = state.get("history") or [] + new_history = list(history) + new_history.append({"question": question, "generation": generation}) + return { "question": question, "generation": generation, "documents": [], "steps": steps, + "history": new_history, } diff --git a/src/app/graph/router.py b/src/app/graph/router.py index c2956d4..ded0605 100644 --- a/src/app/graph/router.py +++ b/src/app/graph/router.py @@ -30,7 +30,7 @@ class RouteDecision(BaseModel): ) -def router_node(state: AgentState) -> dict[str, Any]: +async def router_node(state: AgentState) -> dict[str, Any]: """ Classify the user query as needing document retrieval or a direct LLM response. @@ -68,12 +68,16 @@ def router_node(state: AgentState) -> dict[str, Any]: ) chain = prompt | structured_llm - result = chain.invoke({"question": question}) + result = await chain.ainvoke({"question": question}) return { "question": question, "route": result.route, # type: ignore[union-attr] "steps": steps, + "retry_count": 0, + "documents": [], + "error": None, + "generation": "", } except Exception as e: logger.error("Router node failed: %s", e, exc_info=True) @@ -82,4 +86,7 @@ def router_node(state: AgentState) -> dict[str, Any]: "route": "fallback", "error": str(e), "steps": steps, + "retry_count": 0, + "documents": [], + "generation": "", } From 44a0c3765695f9fddf7f97d872e2995ad2b4de51 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:04:11 -0300 Subject: [PATCH 06/18] feat: add tenant_id and session_id to ChatRequest schema and add DocumentUploadResponse --- src/shared/schemas.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/shared/schemas.py b/src/shared/schemas.py index 6b35587..66a8dd1 100644 --- a/src/shared/schemas.py +++ b/src/shared/schemas.py @@ -5,6 +5,8 @@ class ChatRequest(BaseModel): """Incoming chat request.""" query: str = Field(..., description="The user's technical question.") + tenant_id: str | None = Field(default=None, description="Optional tenant identifier for isolated namespace partition.") + session_id: str | None = Field(default=None, description="Optional session identifier for history tracking.") class SourceDocument(BaseModel): @@ -24,3 +26,12 @@ class ChatResponse(BaseModel): default=None, description="Router decision: 'retrieve' (RAG) or 'direct' (general LLM).", ) + + +class DocumentUploadResponse(BaseModel): + """Response returned after uploading and processing a document.""" + + filename: str + chunks_count: int + tenant_id: str | None = None + status: str = "success" From 4cfe694e18cca197e28f0052ed27ace54b1c973c Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:04:33 -0300 Subject: [PATCH 07/18] feat: add post chat/stream sse endpoint and support session checkpointers and cache bypassing --- src/app/api/v1/chat.py | 101 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/src/app/api/v1/chat.py b/src/app/api/v1/chat.py index 438bdce..02fb491 100644 --- a/src/app/api/v1/chat.py +++ b/src/app/api/v1/chat.py @@ -1,6 +1,9 @@ +import json import logging +import uuid from fastapi import APIRouter, Depends, Header, HTTPException, Request +from fastapi.responses import StreamingResponse from langgraph.graph.state import CompiledStateGraph from src.app.core.config import settings @@ -50,9 +53,9 @@ async def chat_endpoint( elif x_api_key: api_key = x_api_key - # Check semantic cache if enabled - if settings.enable_semantic_cache: - cached = get_semantic_cache().lookup(payload.query, provider=provider) + # Check semantic cache if enabled and this is a single-turn request (no session_id) + if settings.enable_semantic_cache and not payload.session_id: + cached = get_semantic_cache().lookup(payload.query, provider=provider, tenant_id=payload.tenant_id) if cached: sources = [ SourceDocument(content=src["content"], metadata=src["metadata"]) @@ -70,26 +73,26 @@ async def chat_endpoint( ) try: + # Generate session ID if not provided for checkpoint isolation + session_id = payload.session_id or f"transient-{uuid.uuid4()}" + config = {"configurable": {"thread_id": session_id}} + initial_state = { "question": payload.query, - "generation": "", - "documents": [], - "steps": ["received_query"], - "route": "", - "retry_count": 0, "api_key": api_key, "provider": provider, + "tenant_id": payload.tenant_id, } - result = await workflow.ainvoke(initial_state) + result = await workflow.ainvoke(initial_state, config=config) sources = [ SourceDocument(content=doc.page_content, metadata=doc.metadata) for doc in result.get("documents", []) ] - # Update cache if enabled - if settings.enable_semantic_cache: + # Update cache if enabled and this is a single-turn request + if settings.enable_semantic_cache and not payload.session_id: serialized_sources = [ {"content": src.content, "metadata": src.metadata} for src in sources ] @@ -100,6 +103,7 @@ async def chat_endpoint( steps=result.get("steps", []), route=result.get("route"), provider=provider, + tenant_id=payload.tenant_id, ) return ChatResponse( @@ -111,3 +115,78 @@ async def chat_endpoint( except Exception as e: logger.error("Workflow execution failed", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) from e + + +@router.post("/chat/stream") +@limiter.limit("10/minute") +async def chat_stream_endpoint( + request: Request, + payload: ChatRequest, + workflow: CompiledStateGraph = Depends(get_agent_workflow), + authorization: str | None = Header(default=None), + x_api_key: str | None = Header(default=None), + x_provider: str | None = Header(default=None), +): + """ + Submit a question to the Vortex workflow and stream the response token-by-token via SSE. + """ + provider = x_provider.lower() if x_provider else None + if provider and provider not in ["gemini", "anthropic", "ollama"]: + raise HTTPException( + status_code=400, + detail=( + f"Unsupported LLM provider: {x_provider}. " + "Supported providers: gemini, anthropic, ollama" + ), + ) + + api_key = None + if authorization: + if authorization.startswith("Bearer "): + api_key = authorization[7:] + else: + api_key = authorization + elif x_api_key: + api_key = x_api_key + + # Generate session ID if not provided for checkpoint isolation + session_id = payload.session_id or f"transient-{uuid.uuid4()}" + config = {"configurable": {"thread_id": session_id}} + + initial_state = { + "question": payload.query, + "api_key": api_key, + "provider": provider, + "tenant_id": payload.tenant_id, + } + + async def event_generator(): + try: + final_state = {} + async for event in workflow.astream_events( + initial_state, config=config, version="v2" + ): + kind = event["event"] + + # 1. Yield stream tokens from the generation step + if kind == "on_chat_model_stream" and "generate_answer" in event.get("tags", []): + chunk = event["data"].get("chunk") + if chunk and hasattr(chunk, "content") and chunk.content: + yield f"data: {json.dumps({'event': 'token', 'text': chunk.content})}\n\n" + + # 2. Capture final state + elif kind == "on_chain_end" and event.get("name") == "LangGraph": + final_state = event["data"].get("output", {}) + + # 3. Yield metadata payload at completion + sources = [ + {"content": doc.page_content, "metadata": doc.metadata} + for doc in final_state.get("documents", []) + ] + yield f"data: {json.dumps({\n 'event': 'metadata',\n 'sources': sources,\n 'steps': final_state.get('steps', []),\n 'route': final_state.get('route', ''),\n })}\n\n" + + except Exception as err: + logger.error("Error during streaming generation", exc_info=True) + yield f"data: {json.dumps({'event': 'error', 'text': str(err)})}\n\n" + + return StreamingResponse(event_generator(), media_type="text/event-stream") From ac8c7d131394ceb31f469fdefa9d79fe357a4633 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:05:07 -0300 Subject: [PATCH 08/18] feat: add api/v1/documents endpoint supporting markdown and PDF upload and ingestion --- src/app/api/v1/documents.py | 113 ++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/app/api/v1/documents.py diff --git a/src/app/api/v1/documents.py b/src/app/api/v1/documents.py new file mode 100644 index 0000000..0964cac --- /dev/null +++ b/src/app/api/v1/documents.py @@ -0,0 +1,113 @@ +import io +import logging +import asyncio + +from fastapi import APIRouter, Form, HTTPException, Request, UploadFile +from langchain_core.documents import Document +from langchain_text_splitters import RecursiveCharacterTextSplitter +import pypdf + +from src.app.core.rate_limit import limiter +from src.app.services.vector_store import get_vector_store +from src.shared.schemas import DocumentUploadResponse + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post("/documents", response_model=DocumentUploadResponse) +@limiter.limit("5/minute") +async def upload_document( + request: Request, + file: UploadFile, + tenant_id: str | None = Form(None), +): + """ + Upload a document (.md or .pdf) to be chunked and indexed into the vector store. + + If tenant_id is provided, the document is indexed into the corresponding tenant collection partition. + """ + filename = file.filename or "unknown" + if not (filename.endswith(".md") or filename.endswith(".pdf")): + raise HTTPException( + status_code=400, + detail="Unsupported file type. Only .md and .pdf files are supported.", + ) + + try: + content_bytes = await file.read() + + if filename.endswith(".md"): + try: + content = content_bytes.decode("utf-8") + except UnicodeDecodeError as decode_err: + logger.error("Failed to decode markdown file as UTF-8", exc_info=True) + raise HTTPException( + status_code=400, + detail="Invalid UTF-8 encoding in Markdown file.", + ) from decode_err + else: # .pdf + try: + reader = pypdf.PdfReader(io.BytesIO(content_bytes)) + pages_text = [] + for page in reader.pages: + text = page.extract_text() + if text: + pages_text.append(text) + content = "\n\n".join(pages_text) + except Exception as pdf_err: + logger.error("Failed to parse PDF file", exc_info=True) + raise HTTPException( + status_code=400, + detail=f"Failed to parse PDF file: {str(pdf_err)}", + ) from pdf_err + + if not content.strip(): + raise HTTPException( + status_code=400, + detail="The uploaded file contains no readable text content.", + ) + + # Wrap text in a LangChain Document + doc = Document(page_content=content, metadata={"source": filename}) + + # Split into chunks using the same settings as CLI ingestion + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=200, + separators=["\n## ", "\n### ", "\n---", "\n\n", "\n", " "], + ) + splits = text_splitter.split_documents([doc]) + + # Add tenant_id metadata to each split for unified tracking/filtering if needed + if tenant_id: + for s in splits: + s.metadata["tenant_id"] = tenant_id + + # Index splits into Chroma DB collection partition asynchronously on a thread pool + vs = get_vector_store() + await asyncio.to_thread(vs.add_documents, splits, tenant_id=tenant_id) + + logger.info( + "Successfully ingested document '%s' with %d chunks [tenant: %s]", + filename, + len(splits), + tenant_id or "default", + ) + + return DocumentUploadResponse( + filename=filename, + chunks_count=len(splits), + tenant_id=tenant_id, + status="success", + ) + + except HTTPException: + raise + except Exception as e: + logger.error("Error during document ingestion endpoint execution", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"An unexpected error occurred during ingestion: {str(e)}", + ) from e From a61ddf1da7dbd01ef5ecb7102a0ffba0b601c312 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:05:49 -0300 Subject: [PATCH 09/18] feat: register documents router in main.py --- src/app/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/app/main.py b/src/app/main.py index b985484..53afa13 100644 --- a/src/app/main.py +++ b/src/app/main.py @@ -5,7 +5,7 @@ from slowapi.errors import RateLimitExceeded from starlette.exceptions import HTTPException as StarletteHTTPException -from src.app.api.v1 import chat +from src.app.api.v1 import chat, documents from src.app.core.config import settings from src.app.core.exceptions import ( BaseAPIException, @@ -121,3 +121,4 @@ async def metrics_endpoint(): # Register Routers app.include_router(chat.router, prefix="/api/v1", tags=["chat"]) +app.include_router(documents.router, prefix="/api/v1", tags=["documents"]) From 3a9c8e0acd241e16d490219b0c42572ab60ba41b Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:06:22 -0300 Subject: [PATCH 10/18] chore: implement multi-stage Docker build targeting development stage --- Dockerfile | 19 +++++++++++-------- docker-compose.yml | 5 +++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index f0073a5..b7d71a4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.12-slim +FROM python:3.12-slim AS base WORKDIR /app @@ -7,16 +7,19 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ && rm -rf /var/lib/apt/lists/* -# Copy project files first, then install COPY pyproject.toml README.md ./ -COPY src/ ./src/ - -# Install the package (non-editable for production) -RUN pip install --no-cache-dir . -# Copy remaining files (data, scripts, etc.) +# ---- Development stage ---- +FROM base AS development +RUN pip install --no-cache-dir -e ".[dev]" COPY . . - EXPOSE 8000 +CMD ["uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] +# ---- Production stage ---- +FROM base AS production +COPY src/ ./src/ +RUN pip install --no-cache-dir . +COPY . . +EXPOSE 8000 CMD ["uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/docker-compose.yml b/docker-compose.yml index f2d6ad1..d1d8e41 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,15 +1,16 @@ -version: '3.8' - services: api: build: context: . dockerfile: Dockerfile + target: development ports: - "8000:8000" volumes: - ./src:/app/src - ./data:/app/data + - ./tests:/app/tests + - ./pyproject.toml:/app/pyproject.toml env_file: - .env environment: From bfde717b85a47496a5d2f534920eb1fcf49ead2b Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:08:13 -0300 Subject: [PATCH 11/18] chore: add docker targets to Makefile for unified verification inside container --- Makefile | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 93d36aa..6a5a6c4 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: install test lint format run ingest ci docker clean +.PHONY: install test lint format run ingest ci docker clean docker-lint docker-format docker-typecheck docker-test docker-ci # ── Development ───────────────────────────────────────────────────────── @@ -43,6 +43,22 @@ up: down: docker compose down +docker-lint: + docker compose run --rm api ruff check . + +docker-format: + docker compose run --rm api ruff format . + docker compose run --rm api ruff check --fix . + +docker-typecheck: + docker compose run --rm api mypy src/ --ignore-missing-imports + +docker-test: + docker compose run --rm api pytest tests/ -v + +docker-ci: docker-lint docker-typecheck docker-test + @echo "✓ All Docker CI checks passed." + # ── Cleanup ───────────────────────────────────────────────────────────── clean: From 8aedfd0d352e23ad3d2bf5b7dd2806b35394df86 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:14:03 -0300 Subject: [PATCH 12/18] chore: add langchain-text-splitters explicitly to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 94290f8..b659346 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "langgraph>=0.4.0", "langchain>=0.3.0", "langchain-core>=0.3.0", + "langchain-text-splitters>=0.3.0", "langchain-google-genai>=2.0.0", "langchain-chroma>=0.2.0", "langchain-huggingface>=0.1.0", From d563b5eeb84c5376788aa4ab2e59ab5993ab5bbf Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:16:05 -0300 Subject: [PATCH 13/18] chore: add python-multipart to dependencies --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b659346..a97e084 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "slowapi>=0.1.9", "python-json-logger>=2.0.0", "pypdf>=4.0.0", + "python-multipart>=0.0.9", ] [project.optional-dependencies] From b6d4cfde3eed802a8a7584a885b8dec3cfcf384d Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:20:01 -0300 Subject: [PATCH 14/18] test: add test suite v0.5.0 and fix semantic cache lookup filters --- src/app/services/cache.py | 6 +- tests/test_v050.py | 238 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 tests/test_v050.py diff --git a/src/app/services/cache.py b/src/app/services/cache.py index 26d612e..74ef04c 100644 --- a/src/app/services/cache.py +++ b/src/app/services/cache.py @@ -43,8 +43,10 @@ def lookup(self, query: str, provider: str | None = None, tenant_id: str | None try: active_provider = provider or settings.llm_provider filter_dict = { - "provider": active_provider, - "tenant_id": tenant_id or "default", + "$and": [ + {"provider": active_provider}, + {"tenant_id": tenant_id or "default"}, + ] } results = self.vector_store.similarity_search_with_score( diff --git a/tests/test_v050.py b/tests/test_v050.py new file mode 100644 index 0000000..40f83ef --- /dev/null +++ b/tests/test_v050.py @@ -0,0 +1,238 @@ +import io +import json +import shutil +import tempfile +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi.testclient import TestClient +from langchain_core.documents import Document + +from src.app.core.dependencies import get_agent_workflow +from src.app.main import app + + +@pytest.fixture +def temp_chroma_dir(): + """Create a temporary directory for ChromaDB tests.""" + tmpdir = tempfile.mkdtemp() + yield tmpdir + shutil.rmtree(tmpdir, ignore_errors=True) + + +@pytest.fixture +def vector_store(temp_chroma_dir): + """Create a VectorStoreService with a temporary persist directory.""" + with patch("src.app.services.vector_store.settings") as mock_settings: + mock_settings.chroma_persist_directory = temp_chroma_dir + + # Reset the singleton so we get a fresh instance + import src.app.services.vector_store as vs_module + vs_module._instance = None + + from src.app.services.vector_store import VectorStoreService + service = VectorStoreService() + service.persist_directory = temp_chroma_dir + + yield service + + # Clean up singleton + vs_module._instance = None + + +@pytest.fixture +def semantic_cache(temp_chroma_dir): + """Fixture for SemanticCacheService configured to use a temp directory.""" + with ( + patch("src.app.services.cache.settings") as cache_settings, + patch("src.app.services.vector_store.settings") as vs_settings, + ): + cache_settings.chroma_persist_directory = temp_chroma_dir + cache_settings.semantic_cache_threshold = 0.1 + cache_settings.llm_provider = "gemini" + vs_settings.chroma_persist_directory = temp_chroma_dir + + import src.app.services.cache as cache_module + import src.app.services.vector_store as vs_module + + cache_module._cache_instance = None + vs_module._instance = None + + from src.app.services.cache import SemanticCacheService + service = SemanticCacheService() + + yield service + + # Cleanup singletons + cache_module._cache_instance = None + vs_module._instance = None + + +class TestVectorStoreTenantIsolation: + def test_tenant_isolation_and_counting(self, vector_store): + """Verify dynamic collections keep tenants isolated.""" + doc_a = Document(page_content="Tenant A doc content", metadata={"source": "a.md"}) + doc_b = Document(page_content="Tenant B doc content", metadata={"source": "b.md"}) + + vector_store.add_documents([doc_a], tenant_id="tenant-a") + vector_store.add_documents([doc_b], tenant_id="tenant-b") + + # Isolation verification + res_a = vector_store.search("content", tenant_id="tenant-a") + assert len(res_a) == 1 + assert "Tenant A" in res_a[0].page_content + + res_b = vector_store.search("content", tenant_id="tenant-b") + assert len(res_b) == 1 + assert "Tenant B" in res_b[0].page_content + + # Counting isolation + assert vector_store.document_count(tenant_id="tenant-a") == 1 + assert vector_store.document_count(tenant_id="tenant-b") == 1 + + # Clearing isolation + vector_store.clear(tenant_id="tenant-a") + assert vector_store.document_count(tenant_id="tenant-a") == 0 + assert vector_store.document_count(tenant_id="tenant-b") == 1 + + +class TestSemanticCacheTenantIsolation: + def test_cache_tenant_isolation(self, semantic_cache): + """Verify cache hits are isolated by tenant.""" + semantic_cache.update( + query="test query", + answer="Tenant A Answer", + sources=[], + steps=["step1"], + route="direct", + provider="gemini", + tenant_id="tenant-a", + ) + + # Hits on tenant-a + hit_a = semantic_cache.lookup("test query", provider="gemini", tenant_id="tenant-a") + assert hit_a is not None + assert hit_a["answer"] == "Tenant A Answer" + + # Misses on tenant-b + hit_b = semantic_cache.lookup("test query", provider="gemini", tenant_id="tenant-b") + assert hit_b is None + + # Misses on default + hit_def = semantic_cache.lookup("test query", provider="gemini") + assert hit_def is None + + +class TestStreamingAndHistoryEndpoints: + def test_chat_stream_endpoint(self): + """Verify POST /api/v1/chat/stream yields SSE chunks and final metadata.""" + mock_workflow = AsyncMock() + + # Simulate graph events stream + async def mock_astream_events(initial_state, config, version): + # yield model stream token + yield { + "event": "on_chat_model_stream", + "name": "gemini", + "tags": ["generate_answer"], + "data": { + "chunk": AsyncMock(content="Hello") + } + } + yield { + "event": "on_chat_model_stream", + "name": "gemini", + "tags": ["generate_answer"], + "data": { + "chunk": AsyncMock(content=" world") + } + } + # yield workflow end + yield { + "event": "on_chain_end", + "name": "LangGraph", + "data": { + "output": { + "documents": [ + Document(page_content="manual info", metadata={"source": "kb.md"}) + ], + "steps": ["router", "generate_answer"], + "route": "retrieve", + } + } + } + + mock_workflow.astream_events = mock_astream_events + + app.dependency_overrides[get_agent_workflow] = lambda: mock_workflow + client = TestClient(app) + + response = client.post( + "/api/v1/chat/stream", + json={"query": "test query", "tenant_id": "tenant-a", "session_id": "session-1"}, + ) + + assert response.status_code == 200 + assert "text/event-stream" in response.headers["content-type"] + + # Parse SSE stream output + lines = [line.decode("utf-8") if isinstance(line, bytes) else line for line in response.iter_lines() if line] + tokens = [] + metadata = None + + for line in lines: + if line.startswith("data: "): + payload = json.loads(line[6:]) + if payload.get("event") == "token": + tokens.append(payload["text"]) + elif payload.get("event") == "metadata": + metadata = payload + + assert "".join(tokens) == "Hello world" + assert metadata is not None + assert metadata["route"] == "retrieve" + assert metadata["steps"] == ["router", "generate_answer"] + assert len(metadata["sources"]) == 1 + assert metadata["sources"][0]["content"] == "manual info" + + app.dependency_overrides.clear() + + def test_document_ingestion_endpoint(self, vector_store): + """Verify POST /api/v1/documents API chunks and ingests Markdown files.""" + client = TestClient(app) + + # Mock get_vector_store to return our test fixture vector_store + with patch("src.app.api.v1.documents.get_vector_store", return_value=vector_store): + # Test Markdown ingestion + md_content = b"# Title\n\nSome knowledge description.\n\n## Section\n\nMore details." + md_file = io.BytesIO(md_content) + + response = client.post( + "/api/v1/documents", + files={"file": ("manual.md", md_file, "text/markdown")}, + data={"tenant_id": "tenant-xyz"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["filename"] == "manual.md" + assert data["chunks_count"] > 0 + assert data["tenant_id"] == "tenant-xyz" + assert data["status"] == "success" + + # Check vector store + assert vector_store.document_count(tenant_id="tenant-xyz") > 0 + docs = vector_store.search("details", tenant_id="tenant-xyz") + assert len(docs) > 0 + assert "details" in docs[0].page_content + assert docs[0].metadata["tenant_id"] == "tenant-xyz" + + def test_document_ingestion_unsupported_type(self): + """Verify endpoints reject unsupported file extensions.""" + client = TestClient(app) + response = client.post( + "/api/v1/documents", + files={"file": ("manual.txt", io.BytesIO(b"content"), "text/plain")}, + ) + assert response.status_code == 400 + assert "Only .md and .pdf files are supported" in response.json()["detail"] From 2b33ca7a14d833be4d99486299a6fdedfdc20a27 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:22:02 -0300 Subject: [PATCH 15/18] test: refactor fallback test to be async --- tests/test_agent_logic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_agent_logic.py b/tests/test_agent_logic.py index 5aa078c..988f3dd 100644 --- a/tests/test_agent_logic.py +++ b/tests/test_agent_logic.py @@ -105,7 +105,7 @@ def test_falls_back_when_over_limit(self): class TestFallbackNode: - def test_fallback_produces_message(self): + async def test_fallback_produces_message(self): """Fallback node should return a helpful 'not found' message.""" from src.app.graph.nodes import fallback_node @@ -117,7 +117,7 @@ def test_fallback_produces_message(self): "route": "retrieve", "retry_count": 2, } - result = fallback_node(state) + result = await fallback_node(state) assert "knowledge base" in result["generation"].lower() assert result["documents"] == [] assert "fallback" in result["steps"] From 4a50ed6a62b97a081e2a35e8b0679beb2ae052f8 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:24:13 -0300 Subject: [PATCH 16/18] test: align chaos tests with async workflow changes --- tests/test_chaos.py | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/tests/test_chaos.py b/tests/test_chaos.py index 00ec1b8..6d3985b 100644 --- a/tests/test_chaos.py +++ b/tests/test_chaos.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -21,9 +21,11 @@ async def test_chaos_router_llm_failure(): Test that if the LLM fails during routing, the workflow recovers and redirects to the fallback node. """ - # Mock get_llm to raise an exception when invoked + # Mock get_llm to raise an exception when invoked via structured output ainvoke mock_llm = MagicMock() - mock_llm.with_structured_output.side_effect = RuntimeError("LLM Timeout") + mock_structured = AsyncMock() + mock_structured.ainvoke.side_effect = RuntimeError("LLM Timeout") + mock_llm.with_structured_output.return_value = mock_structured with patch("src.app.graph.router.get_llm", return_value=mock_llm): workflow = get_workflow() @@ -57,9 +59,9 @@ async def test_chaos_chromadb_offline(): mock_router_decision.route = "retrieve" mock_router_llm = MagicMock() - mock_router_llm.with_structured_output.return_value = mock_router_llm - mock_router_llm.invoke.return_value = mock_router_decision - mock_router_llm.return_value = mock_router_decision + mock_structured_router = AsyncMock() + mock_structured_router.ainvoke.return_value = mock_router_decision + mock_router_llm.with_structured_output.return_value = mock_structured_router # 2. Mock Vector Store search to raise ConnectionError mock_vs = MagicMock() @@ -101,9 +103,9 @@ async def test_chaos_generate_llm_failure(): mock_router_decision.route = "retrieve" mock_router_llm = MagicMock() - mock_router_llm.with_structured_output.return_value = mock_router_llm - mock_router_llm.invoke.return_value = mock_router_decision - mock_router_llm.return_value = mock_router_decision + mock_structured_router = AsyncMock() + mock_structured_router.ainvoke.return_value = mock_router_decision + mock_router_llm.with_structured_output.return_value = mock_structured_router # 2. Mock Vector Store to return a document (so we bypass rewrite loop) mock_vs = MagicMock() @@ -117,30 +119,20 @@ async def test_chaos_generate_llm_failure(): mock_grade.binary_score = "yes" mock_grader_llm = MagicMock() - mock_grader_llm.with_structured_output.return_value = mock_grader_llm - mock_grader_llm.invoke.return_value = mock_grade - mock_grader_llm.return_value = mock_grade + mock_structured_grader = AsyncMock() + mock_structured_grader.ainvoke.return_value = mock_grade + mock_grader_llm.with_structured_output.return_value = mock_structured_grader # 4. Mock Generator LLM to raise exception - mock_gen_llm = MagicMock() - mock_gen_llm.invoke.side_effect = RuntimeError("Generation Timeout") - mock_gen_llm.side_effect = RuntimeError("Generation Timeout") - - # We use a custom get_llm patch that returns different LLM mocks based on the node - def get_llm_side_effect(api_key=None, provider=None): - # We can inspect call context, but simpler: return mock_gen_llm if it is called - # for generation, or grader/router mock otherwise. - # Actually, to make it robust, we can mock it per-node. - pass + mock_gen_llm = AsyncMock() + mock_gen_llm.ainvoke.side_effect = RuntimeError("Generation Timeout") with ( patch("src.app.graph.router.get_llm", return_value=mock_router_llm), patch("src.app.graph.nodes.get_vector_store", return_value=mock_vs), patch("src.app.graph.nodes.get_llm") as mock_get_llm, ): - # Side effect logic: - # First call in grade_documents node: returns grader LLM - # Second call in generate node: returns generator LLM (which throws error) + # First call: grader_llm; Second call: generator_llm mock_get_llm.side_effect = [mock_grader_llm, mock_gen_llm] workflow = get_workflow() From 138e71cca0f9e5b58e113a8ce7661cd87195c580 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 14:31:23 -0300 Subject: [PATCH 17/18] test: use real Document in chaos generation test for serialization --- tests/test_chaos.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tests/test_chaos.py b/tests/test_chaos.py index 6d3985b..d2713d7 100644 --- a/tests/test_chaos.py +++ b/tests/test_chaos.py @@ -21,9 +21,10 @@ async def test_chaos_router_llm_failure(): Test that if the LLM fails during routing, the workflow recovers and redirects to the fallback node. """ - # Mock get_llm to raise an exception when invoked via structured output ainvoke + # Mock get_llm to raise an exception when invoked via structured output ainvoke or direct call mock_llm = MagicMock() mock_structured = AsyncMock() + mock_structured.side_effect = RuntimeError("LLM Timeout") mock_structured.ainvoke.side_effect = RuntimeError("LLM Timeout") mock_llm.with_structured_output.return_value = mock_structured @@ -38,7 +39,8 @@ async def test_chaos_router_llm_failure(): "retry_count": 0, } - result = await workflow.ainvoke(initial_state) + config = {"configurable": {"thread_id": "chaos_thread_router_fail"}} + result = await workflow.ainvoke(initial_state, config=config) # The workflow must not crash, must return fallback response assert "steps" in result @@ -60,6 +62,7 @@ async def test_chaos_chromadb_offline(): mock_router_llm = MagicMock() mock_structured_router = AsyncMock() + mock_structured_router.return_value = mock_router_decision mock_structured_router.ainvoke.return_value = mock_router_decision mock_router_llm.with_structured_output.return_value = mock_structured_router @@ -81,7 +84,8 @@ async def test_chaos_chromadb_offline(): "retry_count": 0, } - result = await workflow.ainvoke(initial_state) + config = {"configurable": {"thread_id": "chaos_thread_chroma_fail"}} + result = await workflow.ainvoke(initial_state, config=config) # The workflow should route: router -> retrieve -> grade -> fallback assert "steps" in result @@ -104,14 +108,17 @@ async def test_chaos_generate_llm_failure(): mock_router_llm = MagicMock() mock_structured_router = AsyncMock() + mock_structured_router.return_value = mock_router_decision mock_structured_router.ainvoke.return_value = mock_router_decision mock_router_llm.with_structured_output.return_value = mock_structured_router # 2. Mock Vector Store to return a document (so we bypass rewrite loop) + from langchain_core.documents import Document mock_vs = MagicMock() - mock_doc = MagicMock() - mock_doc.page_content = "Sentinel configuration guide." - mock_doc.metadata = {} + mock_doc = Document( + page_content="Sentinel configuration guide.", + metadata={"source": "sentinel_manual.md"} + ) mock_vs.search.return_value = [mock_doc] # 3. Mock Grader LLM to grade document as 'yes' (relevant) @@ -120,11 +127,13 @@ async def test_chaos_generate_llm_failure(): mock_grader_llm = MagicMock() mock_structured_grader = AsyncMock() + mock_structured_grader.return_value = mock_grade mock_structured_grader.ainvoke.return_value = mock_grade mock_grader_llm.with_structured_output.return_value = mock_structured_grader # 4. Mock Generator LLM to raise exception mock_gen_llm = AsyncMock() + mock_gen_llm.side_effect = RuntimeError("Generation Timeout") mock_gen_llm.ainvoke.side_effect = RuntimeError("Generation Timeout") with ( @@ -145,7 +154,8 @@ async def test_chaos_generate_llm_failure(): "retry_count": 0, } - result = await workflow.ainvoke(initial_state) + config = {"configurable": {"thread_id": "chaos_thread_gen_fail"}} + result = await workflow.ainvoke(initial_state, config=config) # The workflow should execute: router -> retrieve -> grade -> generate (fails) assert "steps" in result From bc70d7d591b3f3adc1d0bd9ac975d80791aec000 Mon Sep 17 00:00:00 2001 From: Davi Date: Thu, 4 Jun 2026 22:03:35 -0300 Subject: [PATCH 18/18] docs: update developer guide and readme, fix style formatting, mypy types and mock embeddings --- Makefile | 10 ++++- README.md | 69 ++++++++++++-------------------- docs/architecture.md | 36 +++++++++++------ docs/developer-guide.md | 62 +++++++++++++++++++++++++--- docs/features/caching.md | 13 ++++-- docs/index.md | 32 ++++++++------- mkdocs.yml | 1 + src/app/api/v1/chat.py | 32 ++++++++++----- src/app/api/v1/documents.py | 21 ++++++---- src/app/graph/nodes.py | 29 +++++++++----- src/app/graph/router.py | 5 ++- src/app/graph/state.py | 1 - src/app/graph/workflow.py | 1 - src/app/services/cache.py | 4 +- src/app/services/vector_store.py | 4 +- src/shared/schemas.py | 9 ++++- tests/conftest.py | 49 +++++++++++++++++++++++ tests/test_chaos.py | 6 ++- tests/test_v050.py | 53 ++++++++++++++++-------- tests/test_validation.py | 65 ++++++++++++++++-------------- 20 files changed, 338 insertions(+), 164 deletions(-) create mode 100644 tests/conftest.py diff --git a/Makefile b/Makefile index 6a5a6c4..c995cac 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: install test lint format run ingest ci docker clean docker-lint docker-format docker-typecheck docker-test docker-ci +.PHONY: install test lint format run ingest ci docker clean docker-lint docker-format docker-typecheck docker-test docker-ci docs docker-docs # ── Development ───────────────────────────────────────────────────────── @@ -59,6 +59,14 @@ docker-test: docker-ci: docker-lint docker-typecheck docker-test @echo "✓ All Docker CI checks passed." +# ── Documentation ─────────────────────────────────────────────────────── + +docs: + mkdocs serve + +docker-docs: + docker compose run --rm -p 8001:8001 api mkdocs serve -a 0.0.0.0:8001 + # ── Cleanup ───────────────────────────────────────────────────────────── clean: diff --git a/README.md b/README.md index c1b4b51..04f732d 100644 --- a/README.md +++ b/README.md @@ -60,8 +60,11 @@ graph TD ## 🚀 Key Features * **Self-Corrective RAG (CRAG)**: LangGraph state machine dynamically grades retrieved documentation relevance, filters out noise, and initiates rewrite loops for failed queries. +* **Real-time SSE Streaming**: High-performance Server-Sent Events (SSE) streaming API (`POST /api/v1/chat/stream`) yielding token-by-token generation chunks and final structured references. +* **Multi-Tenant Isolation**: Physical namespace partitioning for ChromaDB collections (`vortex_kb_{tenant_id}`) and dynamic cache lookup isolating tenant context. +* **In-Memory Ingestion API**: Endpoint (`POST /api/v1/documents`) supporting hot-loading of `.md` and `.pdf` files directly into tenant vector stores. * **Bring Your Own Key (BYOK)**: Supports dynamic, request-level API credentials and provider routing via HTTP headers (`Authorization`, `X-API-Key`, `X-Provider`). -* **Zero-Cost Local Semantic Cache**: Persistent ChromaDB-backed similarity cache using a shared local Sentence-Transformers embeddings model. It features provider-level partition isolation to avoid cross-provider context leaks. +* **Zero-Cost Local Semantic Cache**: Persistent ChromaDB-backed similarity cache using a shared local Sentence-Transformers embeddings model. It features provider-level and tenant-level partition isolation to avoid context leaks. * **Chaos Engineering Resilience**: Complete exception shielding across all graph nodes (ChromaDB down, LLM timeouts, grading exceptions) with fast-bypass routing to fallbacks, guaranteeing zero HTTP 500 crashes. * **Model-Agnostic Engine**: Native support for Google Gemini, Anthropic Claude, and local Ollama models with seamless environment-level fallback. * **Observability**: Integrated OpenTelemetry/OpenInference telemetry compatible with Arize Phoenix for step-by-step agent execution tracing. @@ -84,78 +87,56 @@ graph TD ## 📖 Live Documentation Portal -Vortex includes a fully-featured documentation portal built with **MkDocs-Material**. To view the complete architectural guides, step-by-step setups, caching metrics, and API design specifications: +Vortex includes a fully-featured documentation portal built with **MkDocs-Material**: ```bash -# Run local live-reload documentation server -mkdocs serve +make docs # Local live-reload server (requires venv) +make docker-docs # Serve docs inside Docker container ``` -Then visit [http://localhost:8000](http://localhost:8000) in your browser. +Then visit [http://localhost:8001](http://localhost:8001) in your browser. --- ## 🏁 Quick Start -### 1. Installation -Clone the repository and install the development dependencies in editable mode: - ```bash +# 1. Clone and configure git clone https://github.com/soneylegal/vortex.git cd vortex +cp .env.example .env # Fill in your LLM API keys -# Create virtual environment -python3 -m venv venv -source venv/bin/activate - -# Install requirements -pip install -e ".[dev]" -``` - -### 2. Configuration -Copy the configuration template and fill in your model API credentials: -```bash -cp .env.example .env -``` - -### 3. Ingest Documentation -Incorporate the Cortex serverless data pipeline and Sentinel monitoring manuals into the vector database: -```bash -make ingest -``` - -### 4. Run the API Server -Start the FastAPI server: -```bash -make run +# 2. Start the full stack +docker compose up -d ``` * **API Endpoint**: `http://localhost:8000` * **Interactive Swagger UI**: `http://localhost:8000/docs` +* **Phoenix Tracing Dashboard**: `http://localhost:6006` + +> For local development without Docker (venv, pip install, manual server), see the [Developer Guide](docs/developer-guide.md). --- ## 🧪 Development & Quality Checks -Maintain repository standards by executing the local check suite before committing changes: +Maintain repository standards by executing the check suite: +### Local Quality Tools ```bash make lint # Run Ruff code linter -make format # Run Ruff code formatter check +make format # Run Ruff code formatter and check make typecheck # Run Mypy static type verification -make test # Run Pytest suite (including chaos and caching tests) +make test # Run Pytest suite make ci # Run all linting, typechecking, and tests in one command ``` ---- - -## 🐳 Docker Compose (With Observability Stack) - -Spin up the entire stack including the API and the Arize Phoenix tracing UI: - +### 🐳 Docker Quality Tools ```bash -docker compose up -d +make docker-lint # Lint inside container +make docker-format # Format inside container +make docker-typecheck # Typecheck inside container +make docker-test # Run tests inside container +make docker-ci # Run all linting, typechecking, and tests inside container ``` -* **API Portal**: `http://localhost:8000` -* **Phoenix Dashboard**: `http://localhost:6006` --- diff --git a/docs/architecture.md b/docs/architecture.md index 70dbbc9..ee9e025 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -42,33 +42,38 @@ graph TD ## 🧠 State Representation (`AgentState`) -The workflow maintains a single state object, `AgentState` (`TypedDict`), which is passed and updated between nodes: +The workflow maintains a single state object, `AgentState` (`TypedDict`), which is passed and updated between nodes. The graph is compiled with a `MemorySaver` checkpointer to persist state across multi-turn conversations: ```python class AgentState(TypedDict): - question: str # The user's query (potentially rewritten) - generation: str # The final generated response - documents: list[Document] # Retrieved and filtered knowledge documents - steps: list[str] # Execution steps for auditability and tracing - route: str # Router classification ("retrieve", "direct", "fallback") - retry_count: int # Counter protecting against infinite loops - api_key: NotRequired[str] # Dynamic client-provided API key - provider: NotRequired[str] # Selected model provider (gemini, anthropic, ollama) - error: NotRequired[str] # Error message propagated to fallback nodes + question: str # The user's query (potentially rewritten) + generation: str # The final generated response + documents: list[Document] # Retrieved and filtered knowledge documents + steps: list[str] # Execution steps for auditability and tracing + route: str # Router classification ("retrieve", "direct", "fallback") + retry_count: int # Counter protecting against infinite loops + api_key: NotRequired[str] # Dynamic client-provided API key + provider: NotRequired[str] # Selected model provider (gemini, anthropic, ollama) + error: NotRequired[str] # Error message propagated to fallback nodes + tenant_id: NotRequired[str | None] # Tenant namespace for collection isolation + history: NotRequired[list[dict[str, str]]] # Conversation history for multi-turn sessions ``` --- ## 🧱 Workflow Nodes & Operations +All nodes are **fully asynchronous** (`async def`) and use `await` for LLM and I/O operations. + ### 1. Router Node (`router_node`) Classifies incoming queries into technical support questions needing retrieval (`retrieve`) or general conversational questions (`direct`). * **Implementation**: Utilizes structured LLM outputs (`RouteDecision` schema). +* **State Reset**: Clears execution keys (`steps`, `retry_count`, `documents`, `generation`, `error`) at each invocation to prevent carry-over from previous checkpointed turns. * **Error Handling**: If the LLM call fails, the node sets `route` to `"fallback"` and populates the `error` state. ### 2. Retrieve Node (`retrieve_node`) Queries the local ChromaDB vector store for documents related to the current question. -* **Implementation**: Performs vector search using `HuggingFaceEmbeddings` distance search. +* **Implementation**: Performs async vector search via `asyncio.to_thread`, targeting the tenant-specific collection (`vortex_kb_{tenant_id}`). * **Error Handling**: Captures any connection errors to ChromaDB, clears document state, and sets `error` to trigger fallback routing. ### 3. Grade Documents Node (`grade_documents_node`) @@ -84,7 +89,14 @@ Optimizes the search query when retrieved documents are found irrelevant. ### 5. Generate Node (`generate_node`) Synthesizes the final response using the user's question and relevant retrieved context. * **System Prompt**: Enforces boundaries, instructing the LLM to only answer based on context and state if it doesn't know the answer. +* **Tags**: Generation calls are tagged with `"generate_answer"` for SSE stream filtering. +* **History**: Appends the current turn to the conversation `history` list. + +### 6. Direct Response Node (`direct_response_node`) +Handles general queries that don't require document retrieval. +* **History**: Appends the current turn to the conversation `history` list. -### 6. Fallback Node (`fallback_node`) +### 7. Fallback Node (`fallback_node`) The terminal safety node. Returns a polite, helpful explanation to the user. * **Dual Mode**: Differentiates between a *knowledge base miss* (no docs found after retries) and a *system exception* (e.g., ChromaDB offline, LLM api timeouts). + diff --git a/docs/developer-guide.md b/docs/developer-guide.md index 1451e50..80a994b 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -96,14 +96,13 @@ Once running, the interactive Swagger documentation is available at: ## 🧪 Running Tests -### Unit & Integration Tests -Vortex is equipped with 26 automated tests verifying routing logic, BYOK headers, semantic cache, vector store, and chaos fallbacks: +### Local Quality Tools +Vortex is equipped with automated tests verifying routing logic, BYOK headers, semantic cache, vector store, and chaos fallbacks: ```bash pytest tests/ -v ``` -### Style & Types Check Ensure style and type alignment using Ruff and Mypy before pushing code: ```bash @@ -117,6 +116,56 @@ ruff format --check . mypy src/ --ignore-missing-imports ``` +### 🐳 Docker Quality Tools +Run the quality suite inside standard development containers: + +```bash +# Lint inside container +make docker-lint + +# Format and check inside container +make docker-format + +# Typecheck inside container +make docker-typecheck + +# Run tests inside container +make docker-test + +# Run full CI inside container +make docker-ci +``` + +--- + +## 👥 Multi-Tenancy & Streaming APIs + +Vortex supports dynamic namespace isolation and real-time streaming: + +### 1. Document Ingestion API (`POST /api/v1/documents`) +Upload PDF or Markdown files isolated under a specific `tenant_id`: + +```bash +curl -X POST http://localhost:8000/api/v1/documents \ + -F "file=@manual.pdf" \ + -F "tenant_id=tenant-alpha" +``` + +This chunks the document and indexes it into the isolated `vortex_kb_tenant-alpha` collection namespace in ChromaDB. + +### 2. Conversation Chat Streaming API (`POST /api/v1/chat/stream`) +Stream responses token-by-token using Server-Sent Events (SSE), maintaining separate session context with `session_id` and isolating data via `tenant_id`: + +```bash +curl -X POST http://localhost:8000/api/v1/chat/stream \ + -H "Content-Type: application/json" \ + -d '{ + "query": "How do I configure Sentinel?", + "tenant_id": "tenant-alpha", + "session_id": "session-123" + }' +``` + --- ## 📚 Building the Documentation Portal @@ -124,11 +173,12 @@ mypy src/ --ignore-missing-imports To edit and preview this documentation portal locally: ```bash -# Run local live-reload server -mkdocs serve +# Via Makefile (recommended) +make docs # Local live-reload server (requires venv) +make docker-docs # Serve docs inside Docker container ``` -Access the preview at [http://127.0.0.1:8000](http://127.0.0.1:8000). +Access the preview at [http://localhost:8001](http://localhost:8001). To compile the documentation into static HTML files (suitable for GitHub Pages): diff --git a/docs/features/caching.md b/docs/features/caching.md index 305b47d..f2bf152 100644 --- a/docs/features/caching.md +++ b/docs/features/caching.md @@ -21,11 +21,16 @@ To maintain a zero-cost stack, the semantic cache does not require Redis or clou --- -## 🔒 Provider Partition Isolation +## 🔒 Provider & Tenant Partition Isolation -To prevent returning a response generated by one model provider to a client executing a different provider, the cache collection partitions entries: -* Entries are tagged with `provider` metadata. -* Lookup queries apply a metadata filter ensuring only matches from the *active provider* are checked. +To prevent returning a response generated by one model provider or tenant to another, the cache collection partitions entries using a compound filter: +* Entries are tagged with `provider` and `tenant_id` metadata. +* Lookup queries apply a `$and` metadata filter ensuring only matches from the *active provider* **and** the *active tenant* are checked. +* If no `tenant_id` is provided, lookups default to the `"default"` namespace. + +### Session-Based Cache Bypassing + +For multi-turn conversations (requests with a `session_id`), the semantic cache is **automatically bypassed**. This prevents stale cached responses from polluting ongoing dialogues where context evolves with each turn. --- diff --git a/docs/index.md b/docs/index.md index 30511bd..16f47f0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,25 +8,29 @@ Vortex is engineered to resolve complex customer queries for the *Cortex* server ## 🚀 Key Architectural Highlights -* **Bring Your Own Key (BYOK)**: Empowers clients with request-level dynamic API credentials and seamless fallback options. -* **Multi-Provider Flexibility**: Native compatibility with top-tier foundation models (Google Gemini, Anthropic Claude, and local Ollama nodes). -* **Zero-Cost Local Stack**: Complete development environment running locally with ChromaDB and HuggingFace Sentence-Transformers. -* **Semantic Caching Layer**: Ultra-fast cosine-similarity cache partitioned by LLM provider to bypass expensive LLM token calls. -* **Chaos Engineering Resilience**: Built-in exception shielding on graph nodes and state transitions to survive backend service outages. -* **Modern Observability**: Full instrumentation via OpenInference and OpenTelemetry (compatible with Arize Phoenix). +* **Self-Corrective RAG (CRAG)**: LangGraph state machine dynamically grades retrieved documentation relevance, filters out noise, and initiates rewrite loops for failed queries. +* **Real-time SSE Streaming**: High-performance Server-Sent Events (SSE) streaming API (`POST /api/v1/chat/stream`) yielding token-by-token generation chunks and final structured references. +* **Multi-Tenant Isolation**: Physical namespace partitioning for ChromaDB collections (`vortex_kb_{tenant_id}`) and dynamic cache lookup isolating tenant context. +* **In-Memory Ingestion API**: Endpoint (`POST /api/v1/documents`) supporting hot-loading of `.md` and `.pdf` files directly into tenant vector stores. +* **Bring Your Own Key (BYOK)**: Supports dynamic, request-level API credentials and provider routing via HTTP headers (`Authorization`, `X-API-Key`, `X-Provider`). +* **Zero-Cost Local Semantic Cache**: Persistent ChromaDB-backed similarity cache using a shared local Sentence-Transformers embeddings model. It features provider-level and tenant-level partition isolation to avoid context leaks. +* **Chaos Engineering Resilience**: Complete exception shielding across all graph nodes (ChromaDB down, LLM timeouts, grading exceptions) with fast-bypass routing to fallbacks, guaranteeing zero HTTP 500 crashes. +* **Model-Agnostic Engine**: Native support for Google Gemini, Anthropic Claude, and local Ollama models with seamless environment-level fallback. +* **Observability**: Integrated OpenTelemetry/OpenInference telemetry compatible with Arize Phoenix for step-by-step agent execution tracing. --- ## 🛠️ Technology Stack -Vortex is built with modern, industry-standard technologies: - -* **Orchestration**: [LangGraph](https://github.com/langchain-ai/langgraph) / [LangChain](https://github.com/langchain-ai/langchain) -* **API Framework**: [FastAPI](https://fastapi.tiangolo.com/) (Python 3.12+) -* **Vector Database**: [ChromaDB](https://www.trychroma.com/) (Local persistent storage) -* **Embeddings Model**: Local `sentence-transformers/all-MiniLM-L6-v2` via HuggingFace -* **Build System**: [Hatchling](https://hatch.pypa.io/) & [pip] -* **CI/CD & Release**: Python Semantic Release on GitHub Actions +| Layer | Technology | Description | +| :--- | :--- | :--- | +| **Runtime** | Python 3.12 / 3.13 / 3.14 | High-performance async runtime | +| **Web Framework** | FastAPI (AsyncIO) | ASGI web server for fast API delivery | +| **Agent Engine** | LangGraph & LangChain | Stateful multi-actor graph orchestration | +| **Vector Store** | ChromaDB (Embedded) | Persistent vector index for local documents | +| **Embeddings** | `sentence-transformers/all-MiniLM-L6-v2` | CPU-friendly embeddings for zero cost | +| **Observability** | OpenTelemetry + Arize Phoenix | Distributed tracing and LLM evaluation | +| **Build & Tooling** | Hatchling, Ruff, Mypy, pytest | Standardized modern Python developer experience | --- diff --git a/mkdocs.yml b/mkdocs.yml index 5cb5431..df5d8a3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -2,6 +2,7 @@ site_name: Vortex site_description: Production-grade Agentic RAG Orchestrator for Technical Support site_author: Davi Laurindo site_url: https://soneylegal.github.io/vortex/ +dev_addr: "127.0.0.1:8001" theme: name: material diff --git a/src/app/api/v1/chat.py b/src/app/api/v1/chat.py index 02fb491..60261c7 100644 --- a/src/app/api/v1/chat.py +++ b/src/app/api/v1/chat.py @@ -1,9 +1,11 @@ import json import logging import uuid +from typing import Any from fastapi import APIRouter, Depends, Header, HTTPException, Request from fastapi.responses import StreamingResponse +from langchain_core.runnables import RunnableConfig from langgraph.graph.state import CompiledStateGraph from src.app.core.config import settings @@ -55,7 +57,9 @@ async def chat_endpoint( # Check semantic cache if enabled and this is a single-turn request (no session_id) if settings.enable_semantic_cache and not payload.session_id: - cached = get_semantic_cache().lookup(payload.query, provider=provider, tenant_id=payload.tenant_id) + cached = get_semantic_cache().lookup( + payload.query, provider=provider, tenant_id=payload.tenant_id + ) if cached: sources = [ SourceDocument(content=src["content"], metadata=src["metadata"]) @@ -75,9 +79,9 @@ async def chat_endpoint( try: # Generate session ID if not provided for checkpoint isolation session_id = payload.session_id or f"transient-{uuid.uuid4()}" - config = {"configurable": {"thread_id": session_id}} + config: RunnableConfig = {"configurable": {"thread_id": session_id}} - initial_state = { + initial_state: dict[str, Any] = { "question": payload.query, "api_key": api_key, "provider": provider, @@ -128,7 +132,8 @@ async def chat_stream_endpoint( x_provider: str | None = Header(default=None), ): """ - Submit a question to the Vortex workflow and stream the response token-by-token via SSE. + Submit a question to the Vortex workflow and stream the response + token-by-token via SSE. """ provider = x_provider.lower() if x_provider else None if provider and provider not in ["gemini", "anthropic", "ollama"]: @@ -151,9 +156,9 @@ async def chat_stream_endpoint( # Generate session ID if not provided for checkpoint isolation session_id = payload.session_id or f"transient-{uuid.uuid4()}" - config = {"configurable": {"thread_id": session_id}} + config: RunnableConfig = {"configurable": {"thread_id": session_id}} - initial_state = { + initial_state: dict[str, Any] = { "question": payload.query, "api_key": api_key, "provider": provider, @@ -169,10 +174,13 @@ async def event_generator(): kind = event["event"] # 1. Yield stream tokens from the generation step - if kind == "on_chat_model_stream" and "generate_answer" in event.get("tags", []): + if kind == "on_chat_model_stream" and "generate_answer" in event.get( + "tags", [] + ): chunk = event["data"].get("chunk") if chunk and hasattr(chunk, "content") and chunk.content: - yield f"data: {json.dumps({'event': 'token', 'text': chunk.content})}\n\n" + token_data = {"event": "token", "text": chunk.content} + yield f"data: {json.dumps(token_data)}\n\n" # 2. Capture final state elif kind == "on_chain_end" and event.get("name") == "LangGraph": @@ -183,7 +191,13 @@ async def event_generator(): {"content": doc.page_content, "metadata": doc.metadata} for doc in final_state.get("documents", []) ] - yield f"data: {json.dumps({\n 'event': 'metadata',\n 'sources': sources,\n 'steps': final_state.get('steps', []),\n 'route': final_state.get('route', ''),\n })}\n\n" + metadata_payload = { + "event": "metadata", + "sources": sources, + "steps": final_state.get("steps", []), + "route": final_state.get("route", ""), + } + yield f"data: {json.dumps(metadata_payload)}\n\n" except Exception as err: logger.error("Error during streaming generation", exc_info=True) diff --git a/src/app/api/v1/documents.py b/src/app/api/v1/documents.py index 0964cac..587df0e 100644 --- a/src/app/api/v1/documents.py +++ b/src/app/api/v1/documents.py @@ -1,11 +1,11 @@ +import asyncio import io import logging -import asyncio +import pypdf from fastapi import APIRouter, Form, HTTPException, Request, UploadFile from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter -import pypdf from src.app.core.rate_limit import limiter from src.app.services.vector_store import get_vector_store @@ -24,9 +24,11 @@ async def upload_document( tenant_id: str | None = Form(None), ): """ - Upload a document (.md or .pdf) to be chunked and indexed into the vector store. - - If tenant_id is provided, the document is indexed into the corresponding tenant collection partition. + Upload a document (.md or .pdf) to be chunked and indexed into the + vector store. + + If tenant_id is provided, the document is indexed into the corresponding + tenant collection partition. """ filename = file.filename or "unknown" if not (filename.endswith(".md") or filename.endswith(".pdf")): @@ -37,7 +39,7 @@ async def upload_document( try: content_bytes = await file.read() - + if filename.endswith(".md"): try: content = content_bytes.decode("utf-8") @@ -85,7 +87,8 @@ async def upload_document( for s in splits: s.metadata["tenant_id"] = tenant_id - # Index splits into Chroma DB collection partition asynchronously on a thread pool + # Index splits into Chroma DB collection partition asynchronously + # on a thread pool vs = get_vector_store() await asyncio.to_thread(vs.add_documents, splits, tenant_id=tenant_id) @@ -106,7 +109,9 @@ async def upload_document( except HTTPException: raise except Exception as e: - logger.error("Error during document ingestion endpoint execution", exc_info=True) + logger.error( + "Error during document ingestion endpoint execution", exc_info=True + ) raise HTTPException( status_code=500, detail=f"An unexpected error occurred during ingestion: {str(e)}", diff --git a/src/app/graph/nodes.py b/src/app/graph/nodes.py index b09c16a..63cd056 100644 --- a/src/app/graph/nodes.py +++ b/src/app/graph/nodes.py @@ -127,12 +127,14 @@ async def generate_node(state: AgentState) -> dict[str, Any]: try: llm = get_llm(api_key=state.get("api_key"), provider=state.get("provider")) - + # Build history string history = state.get("history") or [] history_str = "" for turn in history: - history_str += f"User: {turn.get('question', '')}\nAssistant: {turn.get('generation', '')}\n\n" + q = turn.get("question", "") + g = turn.get("generation", "") + history_str += f"User: {q}\nAssistant: {g}\n\n" prompt = ChatPromptTemplate.from_template( "You are an expert IT infrastructure support " @@ -151,12 +153,14 @@ async def generate_node(state: AgentState) -> dict[str, Any]: generation = await rag_chain.ainvoke( {"context": docs_content, "question": question, "history": history_str}, - config={"tags": ["generate_answer"]} + config={"tags": ["generate_answer"]}, ) # Update history new_history = list(history) - new_history.append({"question": question, "generation": str(generation.content)}) + new_history.append( + {"question": question, "generation": str(generation.content)} + ) return { "documents": documents, @@ -197,16 +201,19 @@ async def direct_response_node(state: AgentState) -> dict[str, Any]: try: llm = get_llm(api_key=state.get("api_key"), provider=state.get("provider")) - + # Build history string history = state.get("history") or [] history_str = "" for turn in history: - history_str += f"User: {turn.get('question', '')}\nAssistant: {turn.get('generation', '')}\n\n" + q = turn.get("question", "") + g = turn.get("generation", "") + history_str += f"User: {q}\nAssistant: {g}\n\n" prompt = ChatPromptTemplate.from_template( - "You are a helpful IT support assistant. Answer the following question " - "directly and concisely, taking into account the conversation history if relevant.\n\n" + "You are a helpful IT support assistant. Answer the following " + "question directly and concisely, taking into account the " + "conversation history if relevant.\n\n" "Conversation History:\n{history}\n" "Question: {question}\n" "Answer:" @@ -215,12 +222,14 @@ async def direct_response_node(state: AgentState) -> dict[str, Any]: chain = prompt | llm generation = await chain.ainvoke( {"question": question, "history": history_str}, - config={"tags": ["generate_answer"]} + config={"tags": ["generate_answer"]}, ) # Update history new_history = list(history) - new_history.append({"question": question, "generation": str(generation.content)}) + new_history.append( + {"question": question, "generation": str(generation.content)} + ) return { "question": question, diff --git a/src/app/graph/router.py b/src/app/graph/router.py index ded0605..a2aa222 100644 --- a/src/app/graph/router.py +++ b/src/app/graph/router.py @@ -38,8 +38,9 @@ async def router_node(state: AgentState) -> dict[str, Any]: stored in state["route"] for downstream conditional edges and API observability. """ question = state["question"] - steps = list(state.get("steps", [])) - steps.append("router") + # Reset steps for each new invocation to prevent carry-over + # from previous turns when using MemorySaver checkpointer + steps = ["router"] try: llm = get_llm(api_key=state.get("api_key"), provider=state.get("provider")) diff --git a/src/app/graph/state.py b/src/app/graph/state.py index 57b9b21..627f213 100644 --- a/src/app/graph/state.py +++ b/src/app/graph/state.py @@ -29,4 +29,3 @@ class AgentState(TypedDict): error: NotRequired[str | None] tenant_id: NotRequired[str | None] history: NotRequired[list[dict[str, str]] | None] - diff --git a/src/app/graph/workflow.py b/src/app/graph/workflow.py index ebe2078..c6fde50 100644 --- a/src/app/graph/workflow.py +++ b/src/app/graph/workflow.py @@ -29,7 +29,6 @@ from src.app.graph.router import router_node from src.app.graph.state import AgentState - # ── Conditional edge functions ────────────────────────────────────────── diff --git a/src/app/services/cache.py b/src/app/services/cache.py index 74ef04c..12306ae 100644 --- a/src/app/services/cache.py +++ b/src/app/services/cache.py @@ -34,7 +34,9 @@ def __init__(self): ) self.distance_threshold = settings.semantic_cache_threshold - def lookup(self, query: str, provider: str | None = None, tenant_id: str | None = None) -> dict | None: + def lookup( + self, query: str, provider: str | None = None, tenant_id: str | None = None + ) -> dict | None: """ Lookup a query in the cache. Matches semantically similar queries with cosine distance below the configured threshold. diff --git a/src/app/services/vector_store.py b/src/app/services/vector_store.py index 43c2e52..e305065 100644 --- a/src/app/services/vector_store.py +++ b/src/app/services/vector_store.py @@ -28,7 +28,9 @@ def _get_collection(self, tenant_id: str | None = None) -> Chroma: ) return self._collections[collection_name] - def search(self, query: str, k: int = 4, tenant_id: str | None = None) -> list[Document]: + def search( + self, query: str, k: int = 4, tenant_id: str | None = None + ) -> list[Document]: """Search the vector store for relevant documents.""" db = self._get_collection(tenant_id) return db.similarity_search(query, k=k) diff --git a/src/shared/schemas.py b/src/shared/schemas.py index 66a8dd1..b13664d 100644 --- a/src/shared/schemas.py +++ b/src/shared/schemas.py @@ -5,8 +5,13 @@ class ChatRequest(BaseModel): """Incoming chat request.""" query: str = Field(..., description="The user's technical question.") - tenant_id: str | None = Field(default=None, description="Optional tenant identifier for isolated namespace partition.") - session_id: str | None = Field(default=None, description="Optional session identifier for history tracking.") + tenant_id: str | None = Field( + default=None, + description="Optional tenant identifier for isolated namespace partition.", + ) + session_id: str | None = Field( + default=None, description="Optional session identifier for history tracking." + ) class SourceDocument(BaseModel): diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..6249397 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,49 @@ +import hashlib +from unittest.mock import patch + +import numpy as np +import pytest +from langchain_core.embeddings import Embeddings + + +class MockEmbeddings(Embeddings): + def __init__(self, size: int = 384): + self.size = size + + def embed_query(self, text: str) -> list[float]: + vector = np.zeros(self.size) + text_lower = text.lower() + if "cortex" in text_lower: + vector[0] += 1.0 + if "503" in text_lower: + vector[1] += 1.0 + if "lambda" in text_lower: + vector[2] += 1.0 + if "error" in text_lower: + vector[3] += 1.0 + if "test query" in text_lower: + vector[4] += 1.0 + if "content" in text_lower: + vector[5] += 1.0 + if "sentinel" in text_lower: + vector[6] += 1.0 + + h = int(hashlib.md5(text.encode("utf-8")).hexdigest(), 16) + idx = 7 + (h % (self.size - 7)) + vector[idx] += 0.1 + + norm = np.linalg.norm(vector) + if norm > 0: + vector = vector / norm + return vector.tolist() + + def embed_documents(self, texts: list[str]) -> list[list[float]]: + return [self.embed_query(text) for text in texts] + + +@pytest.fixture(autouse=True, scope="session") +def mock_huggingface_embeddings(): + """Mock HuggingFaceEmbeddings to use MockEmbeddings in all tests.""" + with patch("src.app.services.vector_store.HuggingFaceEmbeddings") as mock_hf: + mock_hf.return_value = MockEmbeddings(size=384) + yield diff --git a/tests/test_chaos.py b/tests/test_chaos.py index d2713d7..0804dbe 100644 --- a/tests/test_chaos.py +++ b/tests/test_chaos.py @@ -21,7 +21,8 @@ async def test_chaos_router_llm_failure(): Test that if the LLM fails during routing, the workflow recovers and redirects to the fallback node. """ - # Mock get_llm to raise an exception when invoked via structured output ainvoke or direct call + # Mock get_llm to raise an exception when invoked via structured output + # ainvoke or direct call mock_llm = MagicMock() mock_structured = AsyncMock() mock_structured.side_effect = RuntimeError("LLM Timeout") @@ -114,10 +115,11 @@ async def test_chaos_generate_llm_failure(): # 2. Mock Vector Store to return a document (so we bypass rewrite loop) from langchain_core.documents import Document + mock_vs = MagicMock() mock_doc = Document( page_content="Sentinel configuration guide.", - metadata={"source": "sentinel_manual.md"} + metadata={"source": "sentinel_manual.md"}, ) mock_vs.search.return_value = [mock_doc] diff --git a/tests/test_v050.py b/tests/test_v050.py index 40f83ef..a78b743 100644 --- a/tests/test_v050.py +++ b/tests/test_v050.py @@ -28,9 +28,11 @@ def vector_store(temp_chroma_dir): # Reset the singleton so we get a fresh instance import src.app.services.vector_store as vs_module + vs_module._instance = None from src.app.services.vector_store import VectorStoreService + service = VectorStoreService() service.persist_directory = temp_chroma_dir @@ -59,6 +61,7 @@ def semantic_cache(temp_chroma_dir): vs_module._instance = None from src.app.services.cache import SemanticCacheService + service = SemanticCacheService() yield service @@ -71,8 +74,12 @@ def semantic_cache(temp_chroma_dir): class TestVectorStoreTenantIsolation: def test_tenant_isolation_and_counting(self, vector_store): """Verify dynamic collections keep tenants isolated.""" - doc_a = Document(page_content="Tenant A doc content", metadata={"source": "a.md"}) - doc_b = Document(page_content="Tenant B doc content", metadata={"source": "b.md"}) + doc_a = Document( + page_content="Tenant A doc content", metadata={"source": "a.md"} + ) + doc_b = Document( + page_content="Tenant B doc content", metadata={"source": "b.md"} + ) vector_store.add_documents([doc_a], tenant_id="tenant-a") vector_store.add_documents([doc_b], tenant_id="tenant-b") @@ -110,12 +117,16 @@ def test_cache_tenant_isolation(self, semantic_cache): ) # Hits on tenant-a - hit_a = semantic_cache.lookup("test query", provider="gemini", tenant_id="tenant-a") + hit_a = semantic_cache.lookup( + "test query", provider="gemini", tenant_id="tenant-a" + ) assert hit_a is not None assert hit_a["answer"] == "Tenant A Answer" # Misses on tenant-b - hit_b = semantic_cache.lookup("test query", provider="gemini", tenant_id="tenant-b") + hit_b = semantic_cache.lookup( + "test query", provider="gemini", tenant_id="tenant-b" + ) assert hit_b is None # Misses on default @@ -135,17 +146,13 @@ async def mock_astream_events(initial_state, config, version): "event": "on_chat_model_stream", "name": "gemini", "tags": ["generate_answer"], - "data": { - "chunk": AsyncMock(content="Hello") - } + "data": {"chunk": AsyncMock(content="Hello")}, } yield { "event": "on_chat_model_stream", "name": "gemini", "tags": ["generate_answer"], - "data": { - "chunk": AsyncMock(content=" world") - } + "data": {"chunk": AsyncMock(content=" world")}, } # yield workflow end yield { @@ -154,12 +161,14 @@ async def mock_astream_events(initial_state, config, version): "data": { "output": { "documents": [ - Document(page_content="manual info", metadata={"source": "kb.md"}) + Document( + page_content="manual info", metadata={"source": "kb.md"} + ) ], "steps": ["router", "generate_answer"], "route": "retrieve", } - } + }, } mock_workflow.astream_events = mock_astream_events @@ -169,14 +178,22 @@ async def mock_astream_events(initial_state, config, version): response = client.post( "/api/v1/chat/stream", - json={"query": "test query", "tenant_id": "tenant-a", "session_id": "session-1"}, + json={ + "query": "test query", + "tenant_id": "tenant-a", + "session_id": "session-1", + }, ) assert response.status_code == 200 assert "text/event-stream" in response.headers["content-type"] # Parse SSE stream output - lines = [line.decode("utf-8") if isinstance(line, bytes) else line for line in response.iter_lines() if line] + lines = [ + line.decode("utf-8") if isinstance(line, bytes) else line + for line in response.iter_lines() + if line + ] tokens = [] metadata = None @@ -202,9 +219,13 @@ def test_document_ingestion_endpoint(self, vector_store): client = TestClient(app) # Mock get_vector_store to return our test fixture vector_store - with patch("src.app.api.v1.documents.get_vector_store", return_value=vector_store): + with patch( + "src.app.api.v1.documents.get_vector_store", return_value=vector_store + ): # Test Markdown ingestion - md_content = b"# Title\n\nSome knowledge description.\n\n## Section\n\nMore details." + md_content = ( + b"# Title\n\nSome knowledge description.\n\n## Section\n\nMore details." + ) md_file = io.BytesIO(md_content) response = client.post( diff --git a/tests/test_validation.py b/tests/test_validation.py index e5d1cad..1a1ecb9 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from langchain_core.documents import Document @@ -29,10 +29,12 @@ async def test_validation_successful_rag_path(): # 1. Mock router to route to 'retrieve' mock_router_decision = MagicMock() mock_router_decision.route = "retrieve" + mock_router_llm = MagicMock() - mock_router_llm.with_structured_output.return_value = mock_router_llm - mock_router_llm.invoke.return_value = mock_router_decision - mock_router_llm.return_value = mock_router_decision + mock_structured_router = AsyncMock() + mock_structured_router.return_value = mock_router_decision + mock_structured_router.ainvoke.return_value = mock_router_decision + mock_router_llm.with_structured_output.return_value = mock_structured_router # 2. Mock Vector Store to return a relevant document mock_vs = MagicMock() @@ -45,17 +47,19 @@ async def test_validation_successful_rag_path(): # 3. Mock Grader LLM to grade document as 'yes' (relevant) mock_grade = MagicMock() mock_grade.binary_score = "yes" + mock_grader_llm = MagicMock() - mock_grader_llm.with_structured_output.return_value = mock_grader_llm - mock_grader_llm.invoke.return_value = mock_grade - mock_grader_llm.return_value = mock_grade + mock_structured_grader = AsyncMock() + mock_structured_grader.return_value = mock_grade + mock_structured_grader.ainvoke.return_value = mock_grade + mock_grader_llm.with_structured_output.return_value = mock_structured_grader # 4. Mock Generator LLM to return generation answer mock_gen_response = MagicMock() mock_gen_response.content = "To fix Cortex Error 503, increase concurrency limits." - mock_gen_llm = MagicMock() - mock_gen_llm.invoke.return_value = mock_gen_response + mock_gen_llm = AsyncMock() mock_gen_llm.return_value = mock_gen_response + mock_gen_llm.ainvoke.return_value = mock_gen_response with ( patch("src.app.graph.router.get_llm", return_value=mock_router_llm), @@ -74,7 +78,8 @@ async def test_validation_successful_rag_path(): "retry_count": 0, } - result = await workflow.ainvoke(initial_state) + config = {"configurable": {"thread_id": "test_thread_rag"}} + result = await workflow.ainvoke(initial_state, config=config) # Assert full path: router -> retrieve -> grade -> generate assert "router" in result["steps"] @@ -103,10 +108,12 @@ async def test_validation_correction_rewrite_loop(): # 1. Mock router to route to 'retrieve' mock_router_decision = MagicMock() mock_router_decision.route = "retrieve" + mock_router_llm = MagicMock() - mock_router_llm.with_structured_output.return_value = mock_router_llm - mock_router_llm.invoke.return_value = mock_router_decision - mock_router_llm.return_value = mock_router_decision + mock_structured_router = AsyncMock() + mock_structured_router.return_value = mock_router_decision + mock_structured_router.ainvoke.return_value = mock_router_decision + mock_router_llm.with_structured_output.return_value = mock_structured_router # 2. Mock Vector Store to return document mock_vs = MagicMock() @@ -117,7 +124,6 @@ async def test_validation_correction_rewrite_loop(): page_content="Sentinel restart details.", metadata={"source": "sentinel_manual.md"}, ) - # Search is called twice: first returns irrelevant, second returns relevant mock_vs.search.side_effect = [[mock_doc_irrelevant], [mock_doc_relevant]] # 3. Mock Grader LLM (returns 'no' first, then 'yes') @@ -125,40 +131,38 @@ async def test_validation_correction_rewrite_loop(): mock_grade_no.binary_score = "no" mock_grade_yes = MagicMock() mock_grade_yes.binary_score = "yes" + mock_grader_llm1 = MagicMock() - mock_grader_llm1.with_structured_output.return_value = mock_grader_llm1 - mock_grader_llm1.invoke.return_value = mock_grade_no - mock_grader_llm1.return_value = mock_grade_no + mock_structured_grader1 = AsyncMock() + mock_structured_grader1.return_value = mock_grade_no + mock_structured_grader1.ainvoke.return_value = mock_grade_no + mock_grader_llm1.with_structured_output.return_value = mock_structured_grader1 mock_grader_llm2 = MagicMock() - mock_grader_llm2.with_structured_output.return_value = mock_grader_llm2 - mock_grader_llm2.invoke.return_value = mock_grade_yes - mock_grader_llm2.return_value = mock_grade_yes + mock_structured_grader2 = AsyncMock() + mock_structured_grader2.return_value = mock_grade_yes + mock_structured_grader2.ainvoke.return_value = mock_grade_yes + mock_grader_llm2.with_structured_output.return_value = mock_structured_grader2 # 4. Mock Rewrite LLM mock_rewritten_query = MagicMock() mock_rewritten_query.content = "rewritten query" - mock_rewrite_llm = MagicMock() - mock_rewrite_llm.invoke.return_value = mock_rewritten_query + mock_rewrite_llm = AsyncMock() mock_rewrite_llm.return_value = mock_rewritten_query + mock_rewrite_llm.ainvoke.return_value = mock_rewritten_query # 5. Mock Generator LLM mock_gen_response = MagicMock() mock_gen_response.content = "Restart Sentinel." - mock_gen_llm = MagicMock() - mock_gen_llm.invoke.return_value = mock_gen_response + mock_gen_llm = AsyncMock() mock_gen_llm.return_value = mock_gen_response + mock_gen_llm.ainvoke.return_value = mock_gen_response with ( patch("src.app.graph.router.get_llm", return_value=mock_router_llm), patch("src.app.graph.nodes.get_vector_store", return_value=mock_vs), patch("src.app.graph.nodes.get_llm") as mock_get_llm, ): - # Sequence of get_llm calls inside nodes: - # 1. grade_documents (mock_grader_llm1) - # 2. rewrite_query (mock_rewrite_llm) - # 3. grade_documents (mock_grader_llm2) - # 4. generate (mock_gen_llm) mock_get_llm.side_effect = [ mock_grader_llm1, mock_rewrite_llm, @@ -176,7 +180,8 @@ async def test_validation_correction_rewrite_loop(): "retry_count": 0, } - result = await workflow.ainvoke(initial_state) + config = {"configurable": {"thread_id": "test_thread_rewrite"}} + result = await workflow.ainvoke(initial_state, config=config) # Assert full path: # router -> retrieve -> grade -> rewrite -> retrieve -> grade -> generate