diff --git a/.github/workflows/ci-secret.yaml b/.github/workflows/ci-secret.yaml index c8fb7f46..694c666f 100644 --- a/.github/workflows/ci-secret.yaml +++ b/.github/workflows/ci-secret.yaml @@ -63,6 +63,18 @@ jobs: run: | make docker-up + - name: Wait for graph readiness + run: | + echo "Waiting for graph to finish initializing..." + for i in $(seq 1 360); do + if curl -sf http://localhost:8000/conversations/ready | grep -q '"ready"'; then + echo "Graph is ready" + break + fi + echo "Waiting for graph initialization... ($i/180)" + sleep 10 + done + - name: Run LLM CI id: llm_tests working-directory: evaluation diff --git a/backend/src/api/main.py b/backend/src/api/main.py index 8045a314..3ed04a34 100644 --- a/backend/src/api/main.py +++ b/backend/src/api/main.py @@ -11,9 +11,12 @@ @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: - """Initialize database on startup.""" + """Initialize database on startup and start graph background init.""" logger.info("Initializing database connection...") init_database() + logger.info("Starting graph initialization in background...") + from .routers.conversations import start_graph_init + start_graph_init() yield logger.info("Shutting down...") diff --git a/backend/src/api/routers/conversations.py b/backend/src/api/routers/conversations.py index f0450628..f6db750c 100644 --- a/backend/src/api/routers/conversations.py +++ b/backend/src/api/routers/conversations.py @@ -1,8 +1,9 @@ import os import logging +import threading from dotenv import load_dotenv -from typing import Any +from typing import Any, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException from langchain_google_vertexai import ChatVertexAI @@ -204,17 +205,56 @@ def parse_agent_output(output: list) -> tuple[str, list[ContextSource], list[str return llm_response, context_sources, tools -rg = RetrieverGraph( - llm_model=llm, - embeddings_config=embeddings_config, - reranking_model_name=hf_reranker, - use_cuda=use_cuda, - inbuilt_tool_calling=True, - fast_mode=fast_mode, - debug=debug, - enable_mcp=enable_mcp, -) -rg.initialize() +_rg: Optional[RetrieverGraph] = None +_rg_started = threading.Event() +_rg_ready = threading.Event() + + +def get_graph() -> Optional[RetrieverGraph]: + """Return the initialized graph, or None if not ready yet.""" + return _rg if _rg_ready.is_set() else None + + +def _initialize_graph() -> None: + """Build and initialize the RetrieverGraph (runs in background thread).""" + global _rg + graph = RetrieverGraph( + llm_model=llm, + embeddings_config=embeddings_config, + reranking_model_name=hf_reranker, + use_cuda=use_cuda, + inbuilt_tool_calling=True, + fast_mode=fast_mode, + debug=debug, + enable_mcp=enable_mcp, + ) + graph.initialize() + _rg = graph + _rg_ready.set() + + +def start_graph_init() -> None: + """Start graph initialization in a background thread (idempotent).""" + if _rg_started.is_set(): + return + _rg_started.set() + threading.Thread(target=_initialize_graph, daemon=True).start() + + +def reset_graph_state_for_testing() -> None: + """Reset graph state so tests can simulate a fresh startup.""" + global _rg + _rg = None + _rg_started.clear() + _rg_ready.clear() + + +@router.get("/ready") +async def ready() -> dict[str, str]: + """Readiness probe — returns 'ready' when the graph is fully initialized.""" + if _rg_ready.is_set(): + return {"status": "ready"} + return {"status": "initializing"} chat_history: dict[UUID, list[dict[str, str]]] = {} @@ -283,10 +323,11 @@ async def get_agent_response( "chat_history": get_history_str(db, conversation_uuid), } - if rg.graph is not None: - output = list(rg.graph.stream(inputs, stream_mode="updates")) + graph = get_graph() + if graph is not None and graph.graph is not None: + output = list(graph.graph.stream(inputs, stream_mode="updates")) else: - raise ValueError("RetrieverGraph not initialized.") + raise HTTPException(status_code=503, detail="Graph is still initializing. Please retry shortly.") llm_response, context_sources, tools = parse_agent_output(output) @@ -382,8 +423,9 @@ async def get_response_stream(user_input: UserInput, db: Session | None) -> Any: current_llm_call_count = 1 chunks: list[str] = [] - if rg.graph is not None: - async for event in rg.graph.astream_events(inputs, version="v2"): + graph = get_graph() + if graph is not None and graph.graph is not None: + async for event in graph.graph.astream_events(inputs, version="v2"): chunk = event["event"] if chunk == "on_chat_model_end": @@ -406,6 +448,9 @@ async def get_response_stream(user_input: UserInput, db: Session | None) -> Any: if msg: chunks.append(str(msg)) yield str(msg) + "\n\n" + else: + yield "Error: Graph is still initializing. Please retry shortly.\n\n" + return urls = list(set(urls)) yield f"Sources: {', '.join(urls)}\n\n" diff --git a/backend/src/vectorstores/faiss.py b/backend/src/vectorstores/faiss.py index 8c7c93b3..52cdc15f 100644 --- a/backend/src/vectorstores/faiss.py +++ b/backend/src/vectorstores/faiss.py @@ -1,4 +1,5 @@ import os +import time import logging from typing import Optional, Union from dotenv import load_dotenv @@ -74,13 +75,13 @@ def faiss_db(self) -> Optional[FAISS]: @retry( stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=2, min=10, max=120), + wait=wait_exponential(multiplier=2, min=60, max=600), retry=retry_if_exception( lambda e: "RESOURCE_EXHAUSTED" in str(e) or "429" in str(e) ), reraise=True, ) - def _add_to_db(self, documents: list[Document]) -> None: + def _embed_and_add(self, documents: list[Document]) -> None: if self._faiss_db is None: self._faiss_db = FAISS.from_documents( documents=documents, @@ -90,6 +91,13 @@ def _add_to_db(self, documents: list[Document]) -> None: else: self._faiss_db.add_documents(documents) + def _add_to_db(self, documents: list[Document], batch_size: int = 100) -> None: + for i in range(0, len(documents), batch_size): + batch = documents[i : i + batch_size] + self._embed_and_add(batch) + if i + batch_size < len(documents): + time.sleep(1) + def add_md_docs( self, folder_paths: list[str], chunk_size: int = 500, return_docs: bool = False ) -> Optional[list[Document]]: @@ -229,7 +237,7 @@ def get_documents(self) -> list[Document]: @retry( stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=2, min=10, max=120), + wait=wait_exponential(multiplier=2, min=60, max=600), retry=retry_if_exception( lambda e: "RESOURCE_EXHAUSTED" in str(e) or "429" in str(e) ), diff --git a/backend/tests/test_api_conversations_streaming.py b/backend/tests/test_api_conversations_streaming.py index a3a921de..193a250e 100644 --- a/backend/tests/test_api_conversations_streaming.py +++ b/backend/tests/test_api_conversations_streaming.py @@ -37,7 +37,13 @@ def mock_retriever_graph(): """Mock RetrieverGraph for streaming tests.""" # Reset the mock for each test mock_graph_global.reset_mock() + # Set up the lazy-loaded graph so get_graph() returns the mock + conversations._rg = mock_rg_instance + conversations._rg_started.set() + conversations._rg_ready.set() yield mock_graph_global + # Teardown: reset graph state for next test + conversations.reset_graph_state_for_testing() @pytest.fixture @@ -436,16 +442,15 @@ async def test_get_response_stream_graph_not_initialized( """Test behavior when graph is not initialized.""" from src.api.routers.conversations import get_response_stream - with patch("src.api.routers.conversations.rg") as mock_rg: - mock_rg.graph = None + # Reset to simulate a fresh process where the graph hasn't been started yet + conversations.reset_graph_state_for_testing() - chunks = [] - async for chunk in get_response_stream(sample_user_input, db_session): - chunks.append(chunk) + chunks = [] + async for chunk in get_response_stream(sample_user_input, db_session): + chunks.append(chunk) - # When graph is None, streaming continues but produces no content chunks - # Should still have sources line - assert any("Sources:" in c for c in chunks) + # When graph is None, the stream yields an error and returns early + assert any("still initializing" in c for c in chunks) @pytest.mark.asyncio async def test_get_response_stream_empty_content( diff --git a/docker-compose.yml b/docker-compose.yml index 110e8571..baeb9205 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,7 +45,7 @@ services: interval: ${HEALTHCHECK_INTERVAL:-30s} timeout: ${HEALTHCHECK_TIMEOUT:-10s} retries: ${HEALTHCHECK_RETRIES:-5} - start_period: ${HEALTHCHECK_START_PERIOD:-1200s} + start_period: ${HEALTHCHECK_START_PERIOD:-30s} frontend: build: