Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 234 additions & 1 deletion src/uipath_langchain/agent/tools/escalation_tool.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Escalation tool creation for Action Center integration."""

import json
import logging
from enum import Enum
from typing import Any, Literal

Expand All @@ -23,7 +25,11 @@
from uipath.platform.common import WaitEscalation
from uipath.runtime.errors import UiPathErrorCategory

from uipath_langchain._utils import get_execution_folder_path
from uipath_langchain._utils import (
get_current_span_and_trace_ids,
get_execution_folder_path,
set_span_attribute,
)
from uipath_langchain._utils.durable_interrupt import durable_interrupt
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
from uipath_langchain.agent.tools.structured_tool_with_argument_properties import (
Expand All @@ -39,6 +45,8 @@
sanitize_tool_name,
)

_escalation_logger = logging.getLogger(__name__)


class EscalationAction(str, Enum):
"""Actions that can be taken after an escalation completes."""
Expand Down Expand Up @@ -161,6 +169,35 @@ def _parse_task_data(
return filtered_fields


def _get_escalation_memory_space_id(
resource: AgentEscalationResourceConfig,
) -> str | None:
"""Resolve memory space ID from escalation resource extra fields."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you validate these values in the first place? or do you rely on the input being correct? what if access got revoked on the folder between different runs or if the memory spaces got deleted?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Temporal backend follows a best-effort with lazy validation pattern here — no upfront validation of memorySpaceId/folderKey. Memory space and folder are resolved only when search/ingest is actually triggered, and all memory operations are wrapped in try/catch:

  • FetchFromMemory: returns null on any error — escalation proceeds normally
  • Ingest: catches errors, logs them, but still returns the escalation result
  • No retry logic — single attempt, any failure is a graceful no-op

(Ref: EscalationToolExecutor.cs lines 642-690 for search, 507-548 for ingest)

Our Python implementation follows the same pattern. We can take up additional validation (e.g. checking for deleted memory spaces or revoked folder access upfront) as an improvement in a separate PR.

if not resource.is_agent_memory_enabled:
return None
return getattr(resource, "memorySpaceId", None) or getattr(
resource, "memory_space_id", None
)


def _get_escalation_memory_settings(
resource: AgentEscalationResourceConfig,
) -> dict[str, Any] | None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can't return a class?

"""Extract memory settings from escalation resource properties.

Maps to EscalationResourceDefinition.Properties.Memory in the Temporal
backend (backend/Common.Models/AgentExecution/ResourceDefinition.cs:96).
"""
if not resource.is_agent_memory_enabled:
return None
props = getattr(resource, "properties", None)
if isinstance(props, dict):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems suspect

return props.get("memory")
if props is not None:
return getattr(props, "memory", None)
return None


def create_escalation_tool(
resource: AgentEscalationResourceConfig,
) -> StructuredTool:
Expand All @@ -178,6 +215,8 @@ class EscalationToolOutput(BaseModel):
is_deleted: bool = False

_bts_context: dict[str, Any] = {}
_memory_space_id: str | None = _get_escalation_memory_space_id(resource)
_memory_settings: dict[str, Any] | None = _get_escalation_memory_settings(resource)

async def escalation_tool_fn(**kwargs: Any) -> dict[str, Any]:
agent_input: dict[str, Any] = (
Expand All @@ -198,6 +237,17 @@ async def escalation_tool_fn(**kwargs: Any) -> dict[str, Any]:

serialized_data = input_model.model_validate(kwargs).model_dump(mode="json")

# --- Escalation memory: check cache before creating HITL task ---
if _memory_space_id:
cached_result = await _check_escalation_memory_cache(
_memory_space_id,
serialized_data,
folder_path=folder_path,
memory_settings=_memory_settings,
)
if cached_result is not None:
return cached_result

@mockable(
name=tool_name.lower(),
description=resource.description,
Expand Down Expand Up @@ -234,6 +284,13 @@ async def create_escalation_task():
return await create_escalation_task()

result = await escalate(**kwargs)
# Extract completed_by_user before validation drops extra fields
# Ref: EscalationToolExecutor.cs:514-516 — resolves ReviewedBy email
_completed_by_user = (
result.get("completed_by_user")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems supect - is your typing correct

if isinstance(result, dict)
else getattr(result, "completed_by_user", None)
)
if isinstance(result, dict):
result = TypeAdapter(EscalationToolOutput).validate_python(result)

Expand Down Expand Up @@ -262,6 +319,23 @@ async def create_escalation_task():
EscalationAction(outcome_str) if outcome_str else EscalationAction.CONTINUE
)

# --- Escalation memory: persist outcome for future recall ---
# Shape must match Temporal backend (EscalationToolExecutor.cs):
# answer: new { taskResult.Output, taskResult.Outcome } (line 485)
# attributes: new JsonObject { ["arguments"] = payload.Input.Arguments } (line 503)
# spanId/traceId/userId: lines 522-526
if _memory_space_id:
span_id, trace_id = get_current_span_and_trace_ids()
await _ingest_escalation_memory(
_memory_space_id,
answer=json.dumps({"output": escalation_output, "outcome": outcome}),
attributes=json.dumps({"arguments": serialized_data}),
span_id=span_id,
trace_id=trace_id,
user_id=_get_user_email(_completed_by_user),
folder_path=folder_path,
)

return {
"action": escalation_action,
"output": escalation_output,
Expand Down Expand Up @@ -333,3 +407,162 @@ async def escalation_wrapper(
tool.set_tool_wrappers(awrapper=escalation_wrapper)

return tool


# --- Escalation memory helpers ---


async def _check_escalation_memory_cache(
memory_space_id: str,
serialized_input: dict[str, Any],
folder_path: str | None = None,
memory_settings: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Check escalation memory for a cached answer.

SearchSettings (threshold, searchMode) are read from the user's memory
settings on the escalation resource, matching the Temporal backend's
BuildMemorySearchRequest (EscalationToolExecutor.cs:714-747).
result_count is always 1 for escalation memory.

Returns the cached result dict if found, None otherwise.
"""

try:
from uipath.platform.memory import (
FieldSettings,
MemorySearchRequest,
SearchField,
SearchMode,
SearchSettings,
)

# Read search settings from user's memory config (threshold, searchMode),
# falling back to defaults. result_count is always 1 for escalation memory.
# Ref: EscalationToolExecutor.cs BuildMemorySearchRequest (lines 740-743)
threshold = 0.0
search_mode = SearchMode.Hybrid
field_settings_lookup: dict[str, dict[str, Any]] = {}
if memory_settings:
threshold = memory_settings.get("threshold", 0.0)
mode_str = memory_settings.get("searchMode", "Hybrid")
search_mode = (
SearchMode(mode_str)
if mode_str in SearchMode.__members__
else SearchMode.Hybrid
)
for fs in memory_settings.get("fieldSettings", []):
if isinstance(fs, dict) and "name" in fs:
field_settings_lookup[fs["name"]] = fs

fields: list[SearchField] = []
for k, v in serialized_input.items():
if v is None:
continue
# When field settings are configured, only include fields with
# configured weights (matching Temporal backend behavior)
if field_settings_lookup and k not in field_settings_lookup:
continue
settings = FieldSettings()
if k in field_settings_lookup:
fs = field_settings_lookup[k]
settings = FieldSettings(weight=fs.get("weight", 1.0))
# key_path must be prefixed with field type (FieldBuilder.cs:15)
fields.append(
SearchField(
key_path=["escalation-input", k],
value=str(v),
settings=settings,
)
)
if not fields:
return None

request = MemorySearchRequest(
fields=fields,
settings=SearchSettings(
threshold=threshold,
result_count=1,
search_mode=search_mode,
),
)
sdk = UiPath()
folder_key = (
sdk.folders.retrieve_folder_key(folder_path) if folder_path else None
)
response = await sdk.memory.escalation_search_async(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should define and use a retriever

memory_space_id=memory_space_id,
request=request,
folder_key=folder_key,
)
if response.results and response.results[0].answer:
cached = response.results[0].answer
_escalation_logger.info(
"Escalation memory cache hit for space '%s'", memory_space_id
)
# Ref: EscalationToolWorkflow.cs:103 — span.Attributes.FromMemory = true
set_span_attribute("fromMemory", True)
return {
"action": EscalationAction.CONTINUE,
"output": cached.output,
"outcome": cached.outcome,
}
except Exception:
_escalation_logger.warning(
"Escalation memory search failed for space '%s'",
memory_space_id,
exc_info=True,
)

return None


async def _ingest_escalation_memory(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is parent span ID pased?

memory_space_id: str,
answer: str,
attributes: str,
span_id: str,
trace_id: str,
user_id: str | None = None,
folder_path: str | None = None,
Comment on lines 291 to +527
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can this be None

) -> None:
"""Persist a resolved escalation outcome into memory.

Sets span attributes to track memory state (EscalationToolWorkflow.cs:131-133):
fromMemory=false (result was not from cache), savedToMemory=true/false.
"""

# Ref: EscalationToolWorkflow.cs:132 — span.Attributes.FromMemory = false
set_span_attribute("fromMemory", False)

try:
from uipath.platform.memory import EscalationMemoryIngestRequest

request = EscalationMemoryIngestRequest(
span_id=span_id,
trace_id=trace_id,
answer=answer,
attributes=attributes,
user_id=user_id,
)
sdk = UiPath()
folder_key = (
sdk.folders.retrieve_folder_key(folder_path) if folder_path else None
)
await sdk.memory.escalation_ingest_async(
memory_space_id=memory_space_id,
request=request,
folder_key=folder_key,
)
# Ref: EscalationToolExecutor.cs:543 — savedToMemory = true on success
set_span_attribute("savedToMemory", True)
_escalation_logger.info(
"Ingested escalation outcome into memory space '%s'", memory_space_id
)
except Exception:
set_span_attribute("savedToMemory", False)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

span needs to be set to error state? error message on span?

_escalation_logger.warning(
"Failed to ingest escalation outcome into memory space '%s'",
memory_space_id,
exc_info=True,
)
Loading
Loading