Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/ci-secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ jobs:
run: |
make docker-up

- name: Wait for graph readiness
run: |
echo "Waiting for graph to finish initializing..."
for i in $(seq 1 360); do
if curl -sf http://localhost:8000/conversations/ready | grep -q '"ready"'; then
echo "Graph is ready"
break
fi
echo "Waiting for graph initialization... ($i/180)"
sleep 10
done

- name: Run LLM CI
id: llm_tests
working-directory: evaluation
Expand Down
5 changes: 4 additions & 1 deletion backend/src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Initialize database on startup."""
"""Initialize database on startup and start graph background init."""
logger.info("Initializing database connection...")
init_database()
logger.info("Starting graph initialization in background...")
from .routers.conversations import start_graph_init
start_graph_init()
yield
logger.info("Shutting down...")

Expand Down
79 changes: 62 additions & 17 deletions backend/src/api/routers/conversations.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import logging
import threading
from dotenv import load_dotenv

from typing import Any
from typing import Any, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from langchain_google_vertexai import ChatVertexAI
Expand Down Expand Up @@ -204,17 +205,56 @@ def parse_agent_output(output: list) -> tuple[str, list[ContextSource], list[str
return llm_response, context_sources, tools


rg = RetrieverGraph(
llm_model=llm,
embeddings_config=embeddings_config,
reranking_model_name=hf_reranker,
use_cuda=use_cuda,
inbuilt_tool_calling=True,
fast_mode=fast_mode,
debug=debug,
enable_mcp=enable_mcp,
)
rg.initialize()
_rg: Optional[RetrieverGraph] = None
_rg_started = threading.Event()
_rg_ready = threading.Event()


def get_graph() -> Optional[RetrieverGraph]:
"""Return the initialized graph, or None if not ready yet."""
return _rg if _rg_ready.is_set() else None


def _initialize_graph() -> None:
"""Build and initialize the RetrieverGraph (runs in background thread)."""
global _rg
graph = RetrieverGraph(
llm_model=llm,
embeddings_config=embeddings_config,
reranking_model_name=hf_reranker,
use_cuda=use_cuda,
inbuilt_tool_calling=True,
fast_mode=fast_mode,
debug=debug,
enable_mcp=enable_mcp,
)
graph.initialize()
_rg = graph
_rg_ready.set()


def start_graph_init() -> None:
"""Start graph initialization in a background thread (idempotent)."""
if _rg_started.is_set():
return
_rg_started.set()
threading.Thread(target=_initialize_graph, daemon=True).start()


def reset_graph_state_for_testing() -> None:
"""Reset graph state so tests can simulate a fresh startup."""
global _rg
_rg = None
_rg_started.clear()
_rg_ready.clear()


@router.get("/ready")
async def ready() -> dict[str, str]:
"""Readiness probe — returns 'ready' when the graph is fully initialized."""
if _rg_ready.is_set():
return {"status": "ready"}
return {"status": "initializing"}


chat_history: dict[UUID, list[dict[str, str]]] = {}
Expand Down Expand Up @@ -283,10 +323,11 @@ async def get_agent_response(
"chat_history": get_history_str(db, conversation_uuid),
}

if rg.graph is not None:
output = list(rg.graph.stream(inputs, stream_mode="updates"))
graph = get_graph()
if graph is not None and graph.graph is not None:
output = list(graph.graph.stream(inputs, stream_mode="updates"))
else:
raise ValueError("RetrieverGraph not initialized.")
raise HTTPException(status_code=503, detail="Graph is still initializing. Please retry shortly.")

llm_response, context_sources, tools = parse_agent_output(output)

Expand Down Expand Up @@ -382,8 +423,9 @@ async def get_response_stream(user_input: UserInput, db: Session | None) -> Any:
current_llm_call_count = 1
chunks: list[str] = []

if rg.graph is not None:
async for event in rg.graph.astream_events(inputs, version="v2"):
graph = get_graph()
if graph is not None and graph.graph is not None:
async for event in graph.graph.astream_events(inputs, version="v2"):
chunk = event["event"]

if chunk == "on_chat_model_end":
Expand All @@ -406,6 +448,9 @@ async def get_response_stream(user_input: UserInput, db: Session | None) -> Any:
if msg:
chunks.append(str(msg))
yield str(msg) + "\n\n"
else:
yield "Error: Graph is still initializing. Please retry shortly.\n\n"
return

urls = list(set(urls))
yield f"Sources: {', '.join(urls)}\n\n"
Expand Down
14 changes: 11 additions & 3 deletions backend/src/vectorstores/faiss.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import time
import logging
from typing import Optional, Union
from dotenv import load_dotenv
Expand Down Expand Up @@ -74,13 +75,13 @@ def faiss_db(self) -> Optional[FAISS]:

@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=10, max=120),
wait=wait_exponential(multiplier=2, min=60, max=600),
retry=retry_if_exception(
lambda e: "RESOURCE_EXHAUSTED" in str(e) or "429" in str(e)
),
reraise=True,
)
def _add_to_db(self, documents: list[Document]) -> None:
def _embed_and_add(self, documents: list[Document]) -> None:
if self._faiss_db is None:
self._faiss_db = FAISS.from_documents(
documents=documents,
Expand All @@ -90,6 +91,13 @@ def _add_to_db(self, documents: list[Document]) -> None:
else:
self._faiss_db.add_documents(documents)

def _add_to_db(self, documents: list[Document], batch_size: int = 100) -> None:
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
self._embed_and_add(batch)
if i + batch_size < len(documents):
time.sleep(1)

def add_md_docs(
self, folder_paths: list[str], chunk_size: int = 500, return_docs: bool = False
) -> Optional[list[Document]]:
Expand Down Expand Up @@ -229,7 +237,7 @@ def get_documents(self) -> list[Document]:

@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=2, min=10, max=120),
wait=wait_exponential(multiplier=2, min=60, max=600),
retry=retry_if_exception(
lambda e: "RESOURCE_EXHAUSTED" in str(e) or "429" in str(e)
),
Expand Down
21 changes: 13 additions & 8 deletions backend/tests/test_api_conversations_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ def mock_retriever_graph():
"""Mock RetrieverGraph for streaming tests."""
# Reset the mock for each test
mock_graph_global.reset_mock()
# Set up the lazy-loaded graph so get_graph() returns the mock
conversations._rg = mock_rg_instance
conversations._rg_started.set()
conversations._rg_ready.set()
yield mock_graph_global
# Teardown: reset graph state for next test
conversations.reset_graph_state_for_testing()


@pytest.fixture
Expand Down Expand Up @@ -436,16 +442,15 @@ async def test_get_response_stream_graph_not_initialized(
"""Test behavior when graph is not initialized."""
from src.api.routers.conversations import get_response_stream

with patch("src.api.routers.conversations.rg") as mock_rg:
mock_rg.graph = None
# Reset to simulate a fresh process where the graph hasn't been started yet
conversations.reset_graph_state_for_testing()

chunks = []
async for chunk in get_response_stream(sample_user_input, db_session):
chunks.append(chunk)
chunks = []
async for chunk in get_response_stream(sample_user_input, db_session):
chunks.append(chunk)

# When graph is None, streaming continues but produces no content chunks
# Should still have sources line
assert any("Sources:" in c for c in chunks)
# When graph is None, the stream yields an error and returns early
assert any("still initializing" in c for c in chunks)

@pytest.mark.asyncio
async def test_get_response_stream_empty_content(
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ services:
interval: ${HEALTHCHECK_INTERVAL:-30s}
timeout: ${HEALTHCHECK_TIMEOUT:-10s}
retries: ${HEALTHCHECK_RETRIES:-5}
start_period: ${HEALTHCHECK_START_PERIOD:-1200s}
start_period: ${HEALTHCHECK_START_PERIOD:-30s}

frontend:
build:
Expand Down
Loading