Conversation
|
Transferring review comments from @andreitava-uip on #793 that apply to escalation memory (now in this PR):
|
|
Another comment from @andreitava-uip on #793:
|
|
Another comment from @andreitava-uip on #793:
|
8c8b6b1 to
4817a5e
Compare
| folder_key = ( | ||
| sdk.folders.retrieve_folder_key(folder_path) if folder_path else None | ||
| ) | ||
| response = await sdk.memory.escalation_search_async( |
There was a problem hiding this comment.
you should define and use a retriever
| def _get_escalation_memory_space_id( | ||
| resource: AgentEscalationResourceConfig, | ||
| ) -> str | None: | ||
| """Resolve memory space ID from escalation resource extra fields.""" |
There was a problem hiding this comment.
how do you validate these values in the first place? or do you rely on the input being correct? what if access got revoked on the folder between different runs or if the memory spaces got deleted?
There was a problem hiding this comment.
The Temporal backend follows a best-effort with lazy validation pattern here — no upfront validation of memorySpaceId/folderKey. Memory space and folder are resolved only when search/ingest is actually triggered, and all memory operations are wrapped in try/catch:
FetchFromMemory: returnsnullon any error — escalation proceeds normally- Ingest: catches errors, logs them, but still returns the escalation result
- No retry logic — single attempt, any failure is a graceful no-op
(Ref: EscalationToolExecutor.cs lines 642-690 for search, 507-548 for ingest)
Our Python implementation follows the same pattern. We can take up additional validation (e.g. checking for deleted memory spaces or revoked folder access upfront) as an improvement in a separate PR.
Adds memory integration to the escalation tool: - Before creating HITL task: escalation_search_async() checks for cached answer - Cache hit returns cached result immediately, skipping human escalation - After human resolution: escalation_ingest_async() persists outcome - Gated by isAgentMemoryEnabled + memorySpaceId on the escalation resource - Search settings (threshold, searchMode, fieldSettings) read from resource config - Span attributes (fromMemory, savedToMemory) for trace observability Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4938bc0 to
7bd14ea
Compare
| if not resource.is_agent_memory_enabled: | ||
| return None | ||
| props = getattr(resource, "properties", None) | ||
| if isinstance(props, dict): |
| # Extract completed_by_user before validation drops extra fields | ||
| # Ref: EscalationToolExecutor.cs:514-516 — resolves ReviewedBy email | ||
| _completed_by_user = ( | ||
| result.get("completed_by_user") |
There was a problem hiding this comment.
this seems supect - is your typing correct
| return None | ||
|
|
||
|
|
||
| async def _ingest_escalation_memory( |
There was a problem hiding this comment.
where is parent span ID pased?
|
|
||
| def _get_escalation_memory_settings( | ||
| resource: AgentEscalationResourceConfig, | ||
| ) -> dict[str, Any] | None: |
There was a problem hiding this comment.
this can't return a class?
| @@ -262,6 +319,23 @@ async def create_escalation_task(): | |||
| EscalationAction(outcome_str) if outcome_str else EscalationAction.CONTINUE | |||
| ) | |||
|
|
|||
| # --- Escalation memory: persist outcome for future recall --- | |||
| # Shape must match Temporal backend (EscalationToolExecutor.cs): | |||
| # answer: new { taskResult.Output, taskResult.Outcome } (line 485) | |||
| # attributes: new JsonObject { ["arguments"] = payload.Input.Arguments } (line 503) | |||
| # spanId/traceId/userId: lines 522-526 | |||
| if _memory_space_id: | |||
| span_id, trace_id = get_current_span_and_trace_ids() | |||
| await _ingest_escalation_memory( | |||
| _memory_space_id, | |||
| answer=json.dumps({"output": escalation_output, "outcome": outcome}), | |||
| attributes=json.dumps({"arguments": serialized_data}), | |||
| span_id=span_id, | |||
| trace_id=trace_id, | |||
| user_id=_get_user_email(_completed_by_user), | |||
| folder_path=folder_path, | |||
| ) | |||
|
|
|||
| return { | |||
| "action": escalation_action, | |||
| "output": escalation_output, | |||
| @@ -333,3 +407,162 @@ async def escalation_wrapper( | |||
| tool.set_tool_wrappers(awrapper=escalation_wrapper) | |||
|
|
|||
| return tool | |||
|
|
|||
|
|
|||
| # --- Escalation memory helpers --- | |||
|
|
|||
|
|
|||
| async def _check_escalation_memory_cache( | |||
| memory_space_id: str, | |||
| serialized_input: dict[str, Any], | |||
| folder_path: str | None = None, | |||
| memory_settings: dict[str, Any] | None = None, | |||
| ) -> dict[str, Any] | None: | |||
| """Check escalation memory for a cached answer. | |||
|
|
|||
| SearchSettings (threshold, searchMode) are read from the user's memory | |||
| settings on the escalation resource, matching the Temporal backend's | |||
| BuildMemorySearchRequest (EscalationToolExecutor.cs:714-747). | |||
| result_count is always 1 for escalation memory. | |||
|
|
|||
| Returns the cached result dict if found, None otherwise. | |||
| """ | |||
|
|
|||
| try: | |||
| from uipath.platform.memory import ( | |||
| FieldSettings, | |||
| MemorySearchRequest, | |||
| SearchField, | |||
| SearchMode, | |||
| SearchSettings, | |||
| ) | |||
|
|
|||
| # Read search settings from user's memory config (threshold, searchMode), | |||
| # falling back to defaults. result_count is always 1 for escalation memory. | |||
| # Ref: EscalationToolExecutor.cs BuildMemorySearchRequest (lines 740-743) | |||
| threshold = 0.0 | |||
| search_mode = SearchMode.Hybrid | |||
| field_settings_lookup: dict[str, dict[str, Any]] = {} | |||
| if memory_settings: | |||
| threshold = memory_settings.get("threshold", 0.0) | |||
| mode_str = memory_settings.get("searchMode", "Hybrid") | |||
| search_mode = ( | |||
| SearchMode(mode_str) | |||
| if mode_str in SearchMode.__members__ | |||
| else SearchMode.Hybrid | |||
| ) | |||
| for fs in memory_settings.get("fieldSettings", []): | |||
| if isinstance(fs, dict) and "name" in fs: | |||
| field_settings_lookup[fs["name"]] = fs | |||
|
|
|||
| fields: list[SearchField] = [] | |||
| for k, v in serialized_input.items(): | |||
| if v is None: | |||
| continue | |||
| # When field settings are configured, only include fields with | |||
| # configured weights (matching Temporal backend behavior) | |||
| if field_settings_lookup and k not in field_settings_lookup: | |||
| continue | |||
| settings = FieldSettings() | |||
| if k in field_settings_lookup: | |||
| fs = field_settings_lookup[k] | |||
| settings = FieldSettings(weight=fs.get("weight", 1.0)) | |||
| # key_path must be prefixed with field type (FieldBuilder.cs:15) | |||
| fields.append( | |||
| SearchField( | |||
| key_path=["escalation-input", k], | |||
| value=str(v), | |||
| settings=settings, | |||
| ) | |||
| ) | |||
| if not fields: | |||
| return None | |||
|
|
|||
| request = MemorySearchRequest( | |||
| fields=fields, | |||
| settings=SearchSettings( | |||
| threshold=threshold, | |||
| result_count=1, | |||
| search_mode=search_mode, | |||
| ), | |||
| ) | |||
| sdk = UiPath() | |||
| folder_key = ( | |||
| sdk.folders.retrieve_folder_key(folder_path) if folder_path else None | |||
| ) | |||
| response = await sdk.memory.escalation_search_async( | |||
| memory_space_id=memory_space_id, | |||
| request=request, | |||
| folder_key=folder_key, | |||
| ) | |||
| if response.results and response.results[0].answer: | |||
| cached = response.results[0].answer | |||
| _escalation_logger.info( | |||
| "Escalation memory cache hit for space '%s'", memory_space_id | |||
| ) | |||
| # Ref: EscalationToolWorkflow.cs:103 — span.Attributes.FromMemory = true | |||
| set_span_attribute("fromMemory", True) | |||
| return { | |||
| "action": EscalationAction.CONTINUE, | |||
| "output": cached.output, | |||
| "outcome": cached.outcome, | |||
| } | |||
| except Exception: | |||
| _escalation_logger.warning( | |||
| "Escalation memory search failed for space '%s'", | |||
| memory_space_id, | |||
| exc_info=True, | |||
| ) | |||
|
|
|||
| return None | |||
|
|
|||
|
|
|||
| async def _ingest_escalation_memory( | |||
| memory_space_id: str, | |||
| answer: str, | |||
| attributes: str, | |||
| span_id: str, | |||
| trace_id: str, | |||
| user_id: str | None = None, | |||
| folder_path: str | None = None, | |||
There was a problem hiding this comment.
why can this be None
| "Ingested escalation outcome into memory space '%s'", memory_space_id | ||
| ) | ||
| except Exception: | ||
| set_span_attribute("savedToMemory", False) |
There was a problem hiding this comment.
span needs to be set to error state? error message on span?
Depends on #793
Summary
_check_escalation_memory_cache) before HITL task creation — cache hit skips human escalation_ingest_escalation_memory) after human resolution to persist for future recallisAgentMemoryEnabled+memorySpaceIdon the escalation resource configfromMemory,savedToMemory) for trace observabilitycompleted_by_useremail for memory ingestuserIdTest plan
tests/agent/tools/test_escalation_memory.py— 7 tests for cache check, ingest, and space ID resolutiontests/agent/tools/test_escalation_tool.py— all 37 tests passtests/agent/tools/test_ixp_escalation_tool.py— all 16 tests pass🤖 Generated with Claude Code