Skip to content

Commit a32082a

Browse files
committed
refactor: solve runtime code inconsistencies
1 parent 9dfeb8d commit a32082a

File tree

1 file changed

+53
-66
lines changed

1 file changed

+53
-66
lines changed

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 53 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,8 @@ async def _run_server(self) -> UiPathRuntimeResult:
295295
)
296296
self._cancel_event.set()
297297
finally:
298-
# Cancel any pending tasks gracefully
299-
for task in [run_task, cancel_task, self._keep_alive_task]:
298+
# Cancel pending tasks
299+
for task in [run_task, cancel_task]:
300300
if task and not task.done():
301301
task.cancel()
302302
try:
@@ -322,7 +322,7 @@ async def _run_server(self) -> UiPathRuntimeResult:
322322
except Exception as e:
323323
if isinstance(e, UiPathMcpRuntimeError):
324324
raise
325-
detail = f"Error: {str(e)}"
325+
detail = f"Error: {e}"
326326
raise UiPathMcpRuntimeError(
327327
UiPathErrorCode.EXECUTION_ERROR,
328328
"MCP Runtime execution failed",
@@ -370,16 +370,17 @@ async def _cleanup(self) -> None:
370370
try:
371371
await transport._ws.close()
372372
except Exception as e:
373-
logger.error(f"Error closing SignalR WebSocket: {str(e)}")
373+
logger.error(f"Error closing SignalR WebSocket: {e}")
374374

375375
# Add a small delay to allow the server to shut down gracefully
376376
if sys.platform == "win32":
377377
await asyncio.sleep(0.5)
378378

379379
async def _handle_signalr_session_closed(self, args: list[str]) -> None:
380-
"""
381-
Handle session closed by server.
382-
"""
380+
"""Handle session closed by server."""
381+
if self._cleanup_done:
382+
return
383+
383384
if len(args) < 1:
384385
logger.error(f"Received invalid websocket message arguments: {args}")
385386
return
@@ -389,10 +390,7 @@ async def _handle_signalr_session_closed(self, args: list[str]) -> None:
389390
await self.remove_session(session_id, reason="server closed")
390391

391392
async def _handle_signalr_message(self, args: list[str]) -> None:
392-
"""
393-
Handle incoming SignalR messages.
394-
"""
395-
393+
"""Handle incoming SignalR messages."""
396394
if self._cleanup_done:
397395
return
398396

@@ -421,7 +419,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
421419
await session_server.start()
422420
except Exception as e:
423421
logger.error(
424-
f"Error starting session server for session {session_id}: {str(e)}"
422+
f"Error starting session server for session {session_id}: {e}"
425423
)
426424
await self._on_session_start_error(session_id)
427425
raise
@@ -435,7 +433,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
435433

436434
except Exception as e:
437435
logger.error(
438-
f"Error handling websocket notification for session {session_id}: {str(e)}"
436+
f"Error handling websocket notification for session {session_id}: {e}"
439437
)
440438

441439
async def _handle_signalr_error(self, error: Any) -> None:
@@ -450,17 +448,21 @@ async def _handle_signalr_close(self) -> None:
450448
"""Handle SignalR connection close event."""
451449
logger.info("Websocket connection closed.")
452450

453-
async def _start_http_server_process(self) -> None:
454-
"""Spawn the streamable-http server process.
455-
456-
The process is started once and shared across all sessions.
457-
"""
451+
def _get_server_env(self) -> dict[str, str]:
452+
"""Return server env vars, with os.environ merged in for Coded servers."""
458453
env_vars = self._server.env.copy()
459454
if self.server_type is UiPathServerType.Coded:
460455
for name, value in os.environ.items():
461456
if name not in env_vars:
462457
env_vars[name] = value
458+
return env_vars
463459

460+
async def _start_http_server_process(self) -> None:
461+
"""Spawn the streamable-http server process.
462+
463+
The process is started once and shared across all sessions.
464+
"""
465+
env_vars = self._get_server_env()
464466
merged_env = {**os.environ, **env_vars} if env_vars else None
465467
self._http_server_stderr_lines = []
466468
self._http_server_process = await asyncio.create_subprocess_exec(
@@ -501,7 +503,12 @@ async def _wait_for_http_server_ready(
501503

502504
url = self._server.url
503505
if not url:
504-
raise ValueError("streamable-http transport requires url in config")
506+
raise UiPathMcpRuntimeError(
507+
McpErrorCode.CONFIGURATION_ERROR,
508+
"Missing URL for streamable-http server",
509+
"Please specify a 'url' in the server configuration for streamable-http transport.",
510+
UiPathErrorCategory.SYSTEM,
511+
)
505512

506513
for attempt in range(max_retries):
507514
# Check if process has crashed
@@ -602,14 +609,6 @@ async def _register(self) -> None:
602609
initialization_successful = False
603610
tools_result: ListToolsResult | None = None
604611
server_stderr_output = ""
605-
env_vars = self._server.env
606-
607-
# if server is Coded, include environment variables
608-
if self.server_type is UiPathServerType.Coded:
609-
for name, value in os.environ.items():
610-
# config env variables should have precedence over system ones
611-
if name not in env_vars:
612-
env_vars[name] = value
613612

614613
try:
615614
if self._server.is_streamable_http:
@@ -649,7 +648,7 @@ async def _register(self) -> None:
649648
server_params = StdioServerParameters(
650649
command=self._server.command,
651650
args=self._server.args,
652-
env=env_vars,
651+
env=self._get_server_env(),
653652
)
654653

655654
with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
@@ -779,49 +778,39 @@ async def _on_session_start_error(self, session_id: str) -> None:
779778
f"Error sending session dispose signal to UiPath MCP Server: {e}"
780779
)
781780

781+
async def _on_keep_alive_response(self, response: CompletionMessage) -> None:
782+
"""Handle keep-alive response: log session state, detect orphaned sandboxed runtimes."""
783+
if response.error:
784+
logger.error(f"Error during keep-alive: {response.error}")
785+
return
786+
session_ids = response.result
787+
logger.info(f"Server active sessions: {session_ids}")
788+
runtime_sessions = {}
789+
for sid, s in self._session_servers.items():
790+
health = s.get_health_info()
791+
runtime_sessions[sid] = {
792+
"task_done": health.task_done,
793+
"active_requests": len(s._active_requests),
794+
}
795+
logger.info(f"Runtime active sessions: {runtime_sessions}")
796+
# If there are no active sessions and this is a sandbox environment
797+
# We need to cancel the runtime
798+
# eg: when user kills the agent that triggered the runtime, before we subscribe to events
799+
if not session_ids and self.sandboxed and not self._cancel_event.is_set():
800+
logger.warning("No active sessions, cancelling sandboxed runtime...")
801+
self._cancel_event.set()
802+
782803
async def _keep_alive(self) -> None:
783-
"""
784-
Heartbeat to keep the runtime available.
785-
"""
804+
"""Heartbeat to keep the runtime available."""
786805
try:
787806
while not self._cancel_event.is_set():
788807
try:
789-
790-
async def on_keep_alive_response(
791-
response: CompletionMessage,
792-
) -> None:
793-
if response.error:
794-
logger.error(f"Error during keep-alive: {response.error}")
795-
return
796-
session_ids = response.result
797-
logger.info(f"Server active sessions: {session_ids}")
798-
runtime_sessions = {}
799-
for sid, s in self._session_servers.items():
800-
health = s.get_health_info()
801-
runtime_sessions[sid] = {
802-
"task_done": health.task_done,
803-
"active_requests": len(s._active_requests),
804-
}
805-
logger.info(f"Runtime active sessions: {runtime_sessions}")
806-
# If there are no active sessions and this is a sandbox environment
807-
# We need to cancel the runtime
808-
# eg: when user kills the agent that triggered the runtime, before we subscribe to events
809-
if (
810-
not session_ids
811-
and self.sandboxed
812-
and not self._cancel_event.is_set()
813-
):
814-
logger.error(
815-
"No active sessions, cancelling sandboxed runtime..."
816-
)
817-
self._cancel_event.set()
818-
819808
if self._signalr_client:
820809
logger.info("Sending keep-alive ping...")
821810
await self._signalr_client.send(
822811
method="OnKeepAlive",
823812
arguments=[],
824-
on_invocation=on_keep_alive_response, # type: ignore
813+
on_invocation=self._on_keep_alive_response, # type: ignore
825814
)
826815
else:
827816
logger.error("SignalR client not initialized during keep-alive")
@@ -839,9 +828,7 @@ async def on_keep_alive_response(
839828
raise
840829

841830
async def _on_runtime_abort(self) -> None:
842-
"""
843-
Sends a runtime abort signalr to terminate all connected sessions.
844-
"""
831+
"""Send a runtime abort request to terminate all connected sessions."""
845832
try:
846833
response = await self._uipath.api_client.request_async(
847834
"POST",
@@ -854,7 +841,7 @@ async def _on_runtime_abort(self) -> None:
854841
)
855842
else:
856843
logger.error(
857-
f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}"
844+
f"Error sending runtime abort to UiPath MCP Server: {response.status_code} - {response.text}"
858845
)
859846
except Exception as e:
860847
logger.error(

0 commit comments

Comments
 (0)