Skip to content
79 changes: 79 additions & 0 deletions lib/galaxy/agents/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, app: MinimalManagerApp, trans: ProvidesUserContext):
self._invocations_service: Optional[Any] = None
self._hda_manager: Optional[HDAManager] = None
self._dataset_collections_service: Optional[Any] = None
self._dynamic_tools_manager: Optional[Any] = None

def _encode_id(self, value: int) -> str:
return self.trans.security.encode_id(value)
Expand Down Expand Up @@ -148,6 +149,14 @@ def dataset_collections_service(self):
self._dataset_collections_service = self.app[DatasetCollectionsService]
return self._dataset_collections_service

@property
def dynamic_tools_manager(self):
if self._dynamic_tools_manager is None:
from galaxy.managers.tools import DynamicToolManager
Comment thread
dannon marked this conversation as resolved.
Outdated

self._dynamic_tools_manager = self.app[DynamicToolManager]
return self._dynamic_tools_manager

def connect(self) -> dict[str, Any]:
config = self.app.config
user = self.trans.user
Expand Down Expand Up @@ -873,3 +882,73 @@ def get_user(self) -> dict[str, Any]:
"deleted": user.deleted,
"create_time": user.create_time.isoformat() if user.create_time else None,
}

# ==================== User-Defined Tools (UDT) ====================

def list_user_tools(self, active: bool = True) -> dict[str, Any]:
user = self.trans.user
if not user:
raise ValueError("User must be authenticated")

tools = list(self.dynamic_tools_manager.list_unprivileged_tools(user, active=active))
return {
"tools": [t.to_dict() for t in tools],
"count": len(tools),
}

def create_user_tool(self, representation: dict[str, Any]) -> dict[str, Any]:
from galaxy.tool_util_models.dynamic_tool_models import DynamicUnprivilegedToolCreatePayload

user = self.trans.user
if not user:
raise ValueError("User must be authenticated")

payload = DynamicUnprivilegedToolCreatePayload(src="representation", representation=representation)
dynamic_tool = self.dynamic_tools_manager.create_unprivileged_tool(user, payload)
return dynamic_tool.to_dict()

def delete_user_tool(self, uuid: str) -> dict[str, Any]:
user = self.trans.user
if not user:
raise ValueError("User must be authenticated")

dynamic_tool = self.dynamic_tools_manager.get_unprivileged_tool_by_uuid(user, uuid)
if dynamic_tool is None:
raise ValueError(f"User-defined tool {uuid!r} not found")

self.dynamic_tools_manager.deactivate_unprivileged_tool(user, dynamic_tool)
return {"uuid": uuid, "deactivated": True}

def run_user_tool(self, history_id: str, tool_uuid: str, inputs: dict[str, Any]) -> dict[str, Any]:
from sqlalchemy import select

from galaxy.model import UserDynamicToolAssociation
Comment thread
dannon marked this conversation as resolved.
Outdated

user = self.trans.user
if not user:
raise ValueError("User must be authenticated")

dynamic_tool = self.dynamic_tools_manager.get_unprivileged_tool_by_uuid(user, tool_uuid)
if dynamic_tool is None:
raise ValueError(f"User-defined tool {tool_uuid!r} not found")
# UDT deactivation is per-user by design: deactivate_unprivileged_tool only
# flips the user-association, leaving DynamicTool.active intact so other
# users sharing the underlying tool aren't affected. The runtime check has
# to look at the association, not just dynamic_tool.active.
session = self.dynamic_tools_manager.session()
assoc_active = session.scalar(
select(UserDynamicToolAssociation.active).where(
UserDynamicToolAssociation.user_id == user.id,
UserDynamicToolAssociation.dynamic_tool_id == dynamic_tool.id,
)
)
if not dynamic_tool.active or not assoc_active:
raise ValueError(f"User-defined tool {tool_uuid!r} is deactivated")

payload = {
"history_id": history_id,
"tool_uuid": tool_uuid,
"inputs": inputs,
}
result = self.tools_service._create(self.trans, payload)
return self._encode_ids_in_response(result)
134 changes: 134 additions & 0 deletions lib/galaxy/webapps/galaxy/api/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,140 @@ def get_user(api_key: str, ctx: MCPContext) -> dict[str, Any]:
ops_manager = get_operations_manager(api_key, ctx)
return ops_manager.get_user()

# ==================== User-Defined Tools (UDT) ====================

@mcp.tool()
def list_user_tools(api_key: str, ctx: MCPContext, active: bool = True) -> dict[str, Any]:
"""List user-defined tools belonging to the current user.

Args:
active: If True (default), only show active tools. Set False to
include deactivated tools.

Returns:
Dict with 'tools' (list of user tools, each with id, uuid, tool_id,
name, and active status) and 'count'.
"""
with _mcp_error_handler("list_user_tools"):
ops_manager = get_operations_manager(api_key, ctx)
return ops_manager.list_user_tools(active)

@mcp.tool()
def create_user_tool(representation: dict[str, Any], api_key: str, ctx: MCPContext) -> dict[str, Any]:
"""Create a user-defined tool in Galaxy from a YAML tool definition.

User-defined tools are lightweight, containerized tools that can be
created without admin privileges. They are stored in the database,
scoped to the creating user, and can be embedded in workflows
(importing the workflow automatically creates the tool for the
importing user).

Requires the USER_TOOL_EXECUTE role on the calling user and
enable_beta_tool_formats=true in the Galaxy config; both are enforced
by the underlying manager and surface as permission/config errors here.

Args:
representation: The tool definition as a dictionary matching the
GalaxyUserTool schema. Required fields:
- class: "GalaxyUserTool" (exactly this string)
- id: tool identifier (lowercase, no spaces, 3-255 chars)
- version: version string (e.g. "0.1.0")
- name: display name shown in Galaxy tool menu
- container: container image as a STRING (e.g. "python:3.12-slim"),
NOT a dict -- this is a common mistake
- shell_command: the command to execute, with $(inputs.name.path)
for data inputs and $(inputs.name) for parameter inputs
- inputs: list of input dicts, each with "name" and "type"
(type can be: "data", "integer", "float", "text", "boolean")
- outputs: list of output dicts, each with "name", "type": "data",
"format" (e.g. "tabular", "vcf", "bed"), and "from_work_dir"

Returns:
Dict with the created tool's id, uuid, tool_id, active status, and
the validated representation.

Example:
create_user_tool({
"class": "GalaxyUserTool",
"id": "my_filter",
"version": "0.1.0",
"name": "My Filter",
"container": "python:3.12-slim",
"shell_command": "python3 -c 'import sys; ...'",
"inputs": [{"name": "input1", "type": "data", "format": "tabular"}],
"outputs": [
{"name": "output1", "type": "data",
"format": "tabular", "from_work_dir": "out.tsv"}
]
})

NEXT STEPS:
- Run the tool: run_user_tool(history_id, tool_uuid, inputs)
- List your tools: list_user_tools()
- Delete a tool: delete_user_tool(uuid)
"""
with _mcp_error_handler("create_user_tool"):
ops_manager = get_operations_manager(api_key, ctx)
return ops_manager.create_user_tool(representation)

@mcp.tool()
def delete_user_tool(uuid: str, api_key: str, ctx: MCPContext) -> dict[str, Any]:
"""Deactivate a user-defined tool. Deactivated tools are not loaded into the toolbox.

Existing job history that referenced the tool is preserved; only
future runs are blocked.

Args:
uuid: The UUID of the tool to deactivate. Get this from list_user_tools().

Returns:
Dict confirming deactivation: {"uuid": ..., "deactivated": True}.
"""
with _mcp_error_handler("delete_user_tool"):
ops_manager = get_operations_manager(api_key, ctx)
return ops_manager.delete_user_tool(uuid)

@mcp.tool()
def run_user_tool(
history_id: str,
tool_uuid: str,
inputs: dict[str, Any],
api_key: str,
ctx: MCPContext,
) -> dict[str, Any]:
"""Run a user-defined tool by UUID, producing outputs in the given history.

Resolution happens through the tool service's standard run path,
which accepts tool_uuid in the payload and dispatches via the
toolbox's unprivileged-tool resolver -- so this is functionally a
UUID-keyed counterpart to run_tool().

Args:
history_id: Galaxy history ID where outputs will be placed.
tool_uuid: The UUID of the user-defined tool (from create_user_tool
or list_user_tools).
inputs: Tool input parameters keyed by input name.
- Dataset inputs: {"input_name": {"src": "hda", "id": "<dataset_id>"}}
- Collection inputs: {"input_name": {"src": "hdca", "id": "<collection_id>"}}
- Scalar parameters: {"param_name": value}

Returns:
Dict with job info (job_id, history_id, state) and output dataset IDs.

Example:
run_user_tool(
history_id="abc123",
tool_uuid="61d15277-a911-45ef-aa66-5385146578cc",
inputs={
"scorer_output": {"src": "hda", "id": "59ace41fc068d3ad"},
"top_tracks_per_variant": 5,
},
)
"""
with _mcp_error_handler("run_user_tool"):
ops_manager = get_operations_manager(api_key, ctx)
return ops_manager.run_user_tool(history_id, tool_uuid, inputs)

mcp_app = mcp.http_app(path="/")
mcp_app.state.mcp_server = mcp

Expand Down
129 changes: 129 additions & 0 deletions test/integration/test_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class TestMCPServerSmoke(IntegrationTestCase):
@classmethod
def handle_galaxy_config_kwds(cls, config):
config["enable_mcp_server"] = True
config["enable_beta_tool_formats"] = True

def _get_mcp_server(self):
from galaxy.webapps.galaxy.api.mcp import get_mcp_app
Expand All @@ -293,6 +294,13 @@ def _get_api_key(self):
_, api_key = self._setup_user_get_key("mcp_test_user@test.com")
return api_key

def _setup_udt_user(self, email: str):
"""Create a user, grant USER_TOOL_EXECUTE, return (user, api_key)."""
user, api_key = self._setup_user_get_key(email)
populator = DatasetPopulator(self.galaxy_interactor)
populator.create_role([user["id"]], role_type="user_tool_execute")
return user, api_key

def _run_async(self, coro):
import asyncio

Expand Down Expand Up @@ -333,6 +341,10 @@ async def _list():
"upload_file_from_url",
"invoke_workflow",
"get_job_status",
"list_user_tools",
"create_user_tool",
"delete_user_tool",
"run_user_tool",
}
assert expected.issubset(tool_names), f"Missing tools: {expected - tool_names}"

Expand Down Expand Up @@ -402,3 +414,120 @@ async def _search():
assert "query" in data
assert "count" in data
assert isinstance(data["tools"], list)

def test_mcp_list_user_tools_empty(self):
"""list_user_tools() returns an empty list for a user with the role and no UDTs."""
from fastmcp import Client
Comment thread
dannon marked this conversation as resolved.
Outdated

mcp_server = self._get_mcp_server()
_, api_key = self._setup_udt_user("udt_list_user@test.com")

async def _list():
async with Client(mcp_server) as client:
return await client.call_tool("list_user_tools", {"api_key": api_key})

result = self._run_async(_list())
assert not result.is_error, result
data = result.data
assert data["tools"] == []
assert data["count"] == 0

def test_mcp_create_user_tool(self):
"""create_user_tool() persists a UDT and returns its uuid."""
from fastmcp import Client

from galaxy_test.base.populators import TOOL_WITH_SHELL_COMMAND
Comment thread
dannon marked this conversation as resolved.
Outdated

mcp_server = self._get_mcp_server()
_, api_key = self._setup_udt_user("udt_create_user@test.com")

async def _create():
async with Client(mcp_server) as client:
return await client.call_tool(
"create_user_tool",
{"api_key": api_key, "representation": TOOL_WITH_SHELL_COMMAND},
)

result = self._run_async(_create())
assert not result.is_error, result
data = result.data
assert "uuid" in data
assert data["representation"]["name"] == TOOL_WITH_SHELL_COMMAND["name"]

def test_mcp_delete_user_tool(self):
"""delete_user_tool() deactivates a UDT so list_user_tools no longer returns it."""
from fastmcp import Client

from galaxy_test.base.populators import TOOL_WITH_SHELL_COMMAND

mcp_server = self._get_mcp_server()
_, api_key = self._setup_udt_user("udt_delete_user@test.com")
populator = DatasetPopulator(self._get_interactor(api_key=api_key))
history_id = populator.new_history()

async def _flow():
async with Client(mcp_server) as client:
create = await client.call_tool(
"create_user_tool",
{"api_key": api_key, "representation": TOOL_WITH_SHELL_COMMAND},
)
uuid = create.data["uuid"]
await client.call_tool("delete_user_tool", {"api_key": api_key, "uuid": uuid})
listed = await client.call_tool("list_user_tools", {"api_key": api_key})
deleted_run = await client.call_tool(
"run_user_tool",
{
"api_key": api_key,
"history_id": history_id,
"tool_uuid": uuid,
"inputs": {},
},
raise_on_error=False,
)
return uuid, listed, deleted_run

uuid, listed, deleted_run = self._run_async(_flow())
uuids_after = {t["uuid"] for t in listed.data["tools"]}
assert uuid not in uuids_after
assert deleted_run.is_error
assert "deactivated" in deleted_run.content[0].text

def test_mcp_run_user_tool(self):
"""run_user_tool() executes a UDT against an HDA input and produces an output."""
from fastmcp import Client

from galaxy_test.base.populators import TOOL_WITH_SHELL_COMMAND

mcp_server = self._get_mcp_server()
_, api_key = self._setup_udt_user("udt_run_user@test.com")

populator = DatasetPopulator(self._get_interactor(api_key=api_key))
history_id = populator.new_history()
dataset = populator.new_dataset(history_id=history_id, content="abc")

async def _flow():
async with Client(mcp_server) as client:
create = await client.call_tool(
"create_user_tool",
{"api_key": api_key, "representation": TOOL_WITH_SHELL_COMMAND},
)
uuid = create.data["uuid"]
return await client.call_tool(
"run_user_tool",
{
"api_key": api_key,
"history_id": history_id,
"tool_uuid": uuid,
"inputs": {"input": {"src": "hda", "id": dataset["id"]}},
},
)

result = self._run_async(_flow())
assert not result.is_error, result
data = result.data
assert data["jobs"][0]["tool_id"] == TOOL_WITH_SHELL_COMMAND["id"]
assert data["jobs"][0]["history_id"] == history_id
assert data["outputs"][0]["output_name"] == "output"
populator.wait_for_history(history_id, assert_ok=True)
output = populator.get_history_dataset_content(history_id)
assert output == "abc\n"
Loading