diff --git a/deployments/k8s b/deployments/k8s index d19d6666ad..d94b52b411 160000 --- a/deployments/k8s +++ b/deployments/k8s @@ -1 +1 @@ -Subproject commit d19d6666adf43f51cc123862fe950ca1c1d9d03b +Subproject commit d94b52b411a61111d475662a3fc012bb96c19cf2 diff --git a/frontend/src/app/workspaces/[workspaceId]/inbox/page.tsx b/frontend/src/app/workspaces/[workspaceId]/inbox/page.tsx index 8a8063c317..638ec9d1be 100644 --- a/frontend/src/app/workspaces/[workspaceId]/inbox/page.tsx +++ b/frontend/src/app/workspaces/[workspaceId]/inbox/page.tsx @@ -15,6 +15,7 @@ export default function InboxPage() { const { sessions, + groups, selectedId, setSelectedId, isLoading: inboxIsLoading, @@ -55,6 +56,7 @@ export default function InboxPage() { return ( } +/** + * Summary of a user for inbox item context. + */ +export type UserSummary = { + /** + * User ID + */ + id: string + /** + * User email + */ + email: string + /** + * User first name + */ + first_name?: string | null + /** + * User last name + */ + last_name?: string | null +} + export type UserUpdate = { password?: string | null email?: string | null @@ -11309,12 +11343,20 @@ export type InboxGetPendingCountResponse = InboxPendingCount export type InboxListItemsData = { cursor?: string | null + /** + * Filter items to a single display group + */ + group?: InboxGroup | null limit?: number /** * Column name to order by (created_at, updated_at, status) */ orderBy?: string | null reverse?: boolean + /** + * Case-insensitive search on item title + */ + search?: string | null /** * Sort direction (asc or desc) */ diff --git a/frontend/src/components/inbox/activity-accordion.tsx b/frontend/src/components/inbox/activity-accordion.tsx deleted file mode 100644 index 3475504e4a..0000000000 --- a/frontend/src/components/inbox/activity-accordion.tsx +++ /dev/null @@ -1,188 +0,0 @@ -"use client" - -import * as AccordionPrimitive from "@radix-ui/react-accordion" -import { - CheckCircle2Icon, - ChevronRightIcon, - CirclePauseIcon, - LoaderCircleIcon, - XCircleIcon, -} from "lucide-react" -import { useMemo } from "react" -import { ScrollArea } from "@/components/ui/scroll-area" -import type { AgentDerivedStatus, InboxSessionItem } from "@/lib/agents" -import { cn } from "@/lib/utils" -import { ActivityItem } from "./activity-item" - -// Define the status groups we want to display -type StatusGroup = "review_required" | "running" | "error" | "completed" - -interface StatusGroupConfig { - label: string - icon: React.ComponentType<{ className?: string }> - statuses: AgentDerivedStatus[] - iconColor: string -} - -const STATUS_GROUPS: Record = { - review_required: { - label: "Review required", - icon: CirclePauseIcon, - statuses: ["PENDING_APPROVAL"], - iconColor: "text-primary", - }, - running: { - label: "In progress", - icon: LoaderCircleIcon, - statuses: ["RUNNING", "CONTINUED_AS_NEW"], - iconColor: "text-muted-foreground", - }, - error: { - label: "Error", - icon: XCircleIcon, - statuses: ["FAILED", "TIMED_OUT", "TERMINATED"], - iconColor: "text-red-600", - }, - completed: { - label: "Completed", - icon: CheckCircle2Icon, - statuses: ["COMPLETED", "CANCELED", "UNKNOWN"], - iconColor: "text-green-600", - }, -} - -// Order in which groups should appear -const GROUP_ORDER: StatusGroup[] = [ - "review_required", - "running", - "error", - "completed", -] - -interface ActivityAccordionProps { - sessions: InboxSessionItem[] - selectedId: string | null - deletingId: string | null - onSelect: (id: string) => void - onDelete?: (id: string) => void -} - -export function ActivityAccordion({ - sessions, - selectedId, - deletingId, - onSelect, - onDelete, -}: ActivityAccordionProps) { - // Group sessions by status category - const groupedSessions = useMemo(() => { - const groups: Record = { - review_required: [], - running: [], - error: [], - completed: [], - } - - for (const session of sessions) { - for (const [groupKey, config] of Object.entries(STATUS_GROUPS)) { - if (config.statuses.includes(session.derivedStatus)) { - groups[groupKey as StatusGroup].push(session) - break - } - } - } - - // Sort sessions within each group by updated_at (most recent first) - for (const groupKey of Object.keys(groups) as StatusGroup[]) { - groups[groupKey].sort( - (a, b) => - new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime() - ) - } - - return groups - }, [sessions]) - - // Calculate which groups have items and should be expanded by default - const defaultExpandedGroups = useMemo(() => { - return GROUP_ORDER.filter((group) => groupedSessions[group].length > 0) - }, [groupedSessions]) - - return ( - - - {GROUP_ORDER.map((groupKey) => { - const config = STATUS_GROUPS[groupKey] - const groupSessions = groupedSessions[groupKey] - const StatusIcon = config.icon - - return ( - - - - {/* Match SidebarTrigger dimensions (h-7 w-7) for alignment */} -
- -
-
- - {config.label} - - {groupSessions.length} - -
-
-
- -
- {groupSessions.map((session) => ( - onSelect(session.id)} - onDelete={ - onDelete && groupKey === "review_required" - ? () => onDelete(session.id) - : undefined - } - /> - ))} -
-
-
- ) - })} -
-
- ) -} diff --git a/frontend/src/components/inbox/activity-item.tsx b/frontend/src/components/inbox/activity-item.tsx deleted file mode 100644 index f15921d42d..0000000000 --- a/frontend/src/components/inbox/activity-item.tsx +++ /dev/null @@ -1,177 +0,0 @@ -import { - AnvilIcon, - FlaskConicalIcon, - LayersIcon, - MessageSquareIcon, - Trash2Icon, - WorkflowIcon, -} from "lucide-react" -import { useState } from "react" -import { - EventCreatedAt, - EventUpdatedAt, -} from "@/components/cases/cases-feed-event" -import { - AlertDialog, - AlertDialogAction, - AlertDialogCancel, - AlertDialogContent, - AlertDialogDescription, - AlertDialogFooter, - AlertDialogHeader, - AlertDialogTitle, -} from "@/components/ui/alert-dialog" -import { Badge } from "@/components/ui/badge" -import type { InboxSessionItem } from "@/lib/agents" -import { cn } from "@/lib/utils" - -interface ActivityItemProps { - session: InboxSessionItem - isSelected: boolean - isDeleting?: boolean - onClick: () => void - onDelete?: () => void -} - -type SourceType = "workflow" | "case" | "chat" | "test" | "assistant" - -interface SourceConfig { - label: string - icon: React.ComponentType<{ className?: string }> -} - -const SOURCE_CONFIGS: Record = { - workflow: { - label: "Workflow", - icon: WorkflowIcon, - }, - case: { - label: "Case", - icon: LayersIcon, - }, - chat: { - label: "Chat", - icon: MessageSquareIcon, - }, - test: { - label: "Test", - icon: FlaskConicalIcon, - }, - assistant: { - label: "Build", - icon: AnvilIcon, - }, -} - -function getSourceType(entityType: string): SourceType { - switch (entityType) { - case "workflow": - return "workflow" - case "case": - return "case" - case "agent_preset": - return "test" - case "agent_preset_builder": - return "assistant" - case "copilot": - default: - return "chat" - } -} - -export function ActivityItem({ - session, - isSelected, - isDeleting, - onClick, - onDelete, -}: ActivityItemProps) { - const [confirmOpen, setConfirmOpen] = useState(false) - - const displayName = - session.parent_workflow?.alias || - session.parent_workflow?.title || - session.title - - const sourceType = getSourceType(session.entity_type) - const sourceConfig = SOURCE_CONFIGS[sourceType] - const SourceIcon = sourceConfig.icon - - return ( - <> -
- - - {onDelete && ( - - )} -
- - - - - Delete approval - - This will deny all pending tool calls and remove this approval - from the inbox. - - - - Cancel - { - setConfirmOpen(false) - onDelete?.() - }} - > - Delete - - - - - - ) -} diff --git a/frontend/src/components/inbox/index.tsx b/frontend/src/components/inbox/index.tsx index 87d209a3b8..ea11242126 100644 --- a/frontend/src/components/inbox/index.tsx +++ b/frontend/src/components/inbox/index.tsx @@ -2,21 +2,23 @@ import { useEffect, useRef } from "react" import { useInboxChat } from "@/app/workspaces/[workspaceId]/inbox/layout" -import type { AgentSessionEntity } from "@/client" +import type { AgentSessionEntity, InboxGroup } from "@/client" import { useScopeCheck } from "@/components/auth/scope-guard" import { CenteredSpinner } from "@/components/loading/spinner" import { toast } from "@/components/ui/use-toast" import { type DateFilterValue, + type InboxGroupState, type UseInboxFilters, useDeleteApproval, } from "@/hooks/use-inbox" import type { InboxSessionItem } from "@/lib/agents" -import { ActivityAccordion } from "./activity-accordion" import { InboxHeader } from "./inbox-header" +import { RunsTable } from "./runs-table" interface ActivityLayoutProps { sessions: InboxSessionItem[] + groups: Record selectedId: string | null onSelect: (id: string | null) => void isLoading: boolean @@ -31,6 +33,7 @@ interface ActivityLayoutProps { export function ActivityLayout({ sessions, + groups, selectedId, onSelect, isLoading, @@ -150,8 +153,8 @@ export function ActivityLayout({ onCreatedAfterChange={onCreatedAfterChange} />
- +} + +const SOURCE_CONFIGS: Record = { + workflow: { + label: "Workflow", + icon: WorkflowIcon, + }, + case: { + label: "Case", + icon: LayersIcon, + }, + chat: { + label: "Chat", + icon: MessageSquareIcon, + }, + test: { + label: "Test", + icon: FlaskConicalIcon, + }, + assistant: { + label: "Build", + icon: AnvilIcon, + }, +} + +type StatusGroup = InboxGroup + +interface StatusGroupConfig { + label: string + icon: React.ComponentType<{ className?: string }> + iconColor: string +} + +const STATUS_GROUPS: Record = { + review_required: { + label: "Review required", + icon: CirclePauseIcon, + iconColor: "text-primary", + }, + running: { + label: "In progress", + icon: LoaderCircleIcon, + iconColor: "text-muted-foreground", + }, + error: { + label: "Error", + icon: XCircleIcon, + iconColor: "text-red-600", + }, + completed: { + label: "Completed", + icon: CheckCircle2Icon, + iconColor: "text-green-600", + }, +} + +// Shared column template so the global header and group rows stay aligned. +const GRID_COLS = "grid-cols-[minmax(0,1fr)_7rem_10rem_8rem_8rem]" +const GRID_COLS_WITH_ACTIONS = + "grid-cols-[minmax(0,1fr)_7rem_10rem_8rem_8rem_2rem]" + +function getSourceType(entityType: string): SourceType { + switch (entityType) { + case "workflow": + case "external_channel": + return "workflow" + case "case": + return "case" + case "agent_preset": + return "test" + case "agent_preset_builder": + return "assistant" + default: + return "chat" + } +} + +function getDisplayName(session: InboxSessionItem): string { + return ( + session.parent_workflow?.alias || + session.parent_workflow?.title || + session.title + ) +} + +function getCreatorName(session: InboxSessionItem): string { + if (!session.created_by) { + return "" + } + return getUserDisplayName(session.created_by) +} + +const DEFAULT_SORT_DIRECTIONS: Record = { + title: "asc", + source: "asc", + created_by: "asc", + created_at: "desc", + updated_at: "desc", +} + +function compareSessions( + a: InboxSessionItem, + b: InboxSessionItem, + sortKey: SortKey +): number { + switch (sortKey) { + case "title": + return getDisplayName(a).localeCompare(getDisplayName(b)) + case "source": + return getSourceType(a.entity_type).localeCompare( + getSourceType(b.entity_type) + ) + case "created_by": + return getCreatorName(a).localeCompare(getCreatorName(b)) + case "created_at": + return new Date(a.created_at).getTime() - new Date(b.created_at).getTime() + case "updated_at": + return new Date(a.updated_at).getTime() - new Date(b.updated_at).getTime() + } +} + +interface SortableHeadProps { + label: string + sortKey: SortKey + activeKey: SortKey + direction: SortDirection + onSort: (key: SortKey) => void + className?: string +} + +function SortableHead({ + label, + sortKey, + activeKey, + direction, + onSort, + className, +}: SortableHeadProps) { + const isActive = activeKey === sortKey + let SortIcon = ArrowUpDownIcon + if (isActive) { + SortIcon = direction === "asc" ? ArrowUpIcon : ArrowDownIcon + } + + return ( + + ) +} + +function formatTimestamp(date: Date): string { + const now = new Date() + if (date.getFullYear() === now.getFullYear()) { + return format(date, "MMM d, HH:mm") + } + return format(date, "MMM d, yyyy") +} + +function TimestampCell({ value, label }: { value: string; label: string }) { + const date = new Date(value) + return ( + + + + + {formatTimestamp(date)} + + + + {label}: {date.toLocaleString()} + + + + ) +} + +interface RunRowProps { + session: InboxSessionItem + isSelected: boolean + isDeleting: boolean + showActions: boolean + onSelect: (id: string) => void + onRequestDelete?: (id: string) => void +} + +function RunRow({ + session, + isSelected, + isDeleting, + showActions, + onSelect, + onRequestDelete, +}: RunRowProps) { + const sourceConfig = SOURCE_CONFIGS[getSourceType(session.entity_type)] + const SourceIcon = sourceConfig.icon + + return ( +
onSelect(session.id)} + onKeyDown={(e) => { + if (e.target !== e.currentTarget) { + return + } + if (e.key === "Enter" || e.key === " ") { + e.preventDefault() + onSelect(session.id) + } + }} + role="button" + tabIndex={0} + className={cn( + "group/row grid cursor-pointer items-center gap-2 border-b border-border/50 py-2 pl-3 pr-3 transition-colors", + showActions ? GRID_COLS_WITH_ACTIONS : GRID_COLS, + "hover:bg-muted/50", + isSelected && "bg-muted hover:bg-muted" + )} + > + + {getDisplayName(session)} + + + + {sourceConfig.label} + +
+ {session.created_by ? ( + <> + + + {getUserDisplayName(session.created_by)} + + + ) : ( + + )} +
+ + + {showActions && ( +
+ {onRequestDelete && session.pendingApprovalCount > 0 && ( + + )} +
+ )} +
+ ) +} + +interface RunsTableProps { + groups: Record + selectedId: string | null + deletingId: string | null + onSelect: (id: string) => void + onDelete?: (id: string) => void +} + +/** + * Agent runs grouped by status into accordion sections, with sortable + * columns shared across groups. Each group is paginated independently on + * the server and exposes a "Show more" row when more items exist. Rows are + * clickable and open the run's session detail; runs with pending approvals + * can be dismissed inline. + */ +export function RunsTable({ + groups, + selectedId, + deletingId, + onSelect, + onDelete, +}: RunsTableProps) { + const [sortKey, setSortKey] = useState("updated_at") + const [sortDirection, setSortDirection] = useState("desc") + const [confirmDeleteId, setConfirmDeleteId] = useState(null) + // Track user-collapsed groups so newly appearing groups start expanded + const [collapsedGroups, setCollapsedGroups] = useState([]) + + const showActions = Boolean(onDelete) + + const handleSort = (key: SortKey) => { + if (key === sortKey) { + setSortDirection((prev) => (prev === "asc" ? "desc" : "asc")) + } else { + setSortKey(key) + setSortDirection(DEFAULT_SORT_DIRECTIONS[key]) + } + } + + // Sort each server-provided group's loaded sessions by the active column + const groupedSessions = useMemo(() => { + const factor = sortDirection === "asc" ? 1 : -1 + const sorted = {} as Record + for (const groupKey of INBOX_GROUP_ORDER) { + sorted[groupKey] = [...groups[groupKey].sessions].sort((a, b) => { + const primary = compareSessions(a, b, sortKey) * factor + if (primary !== 0) { + return primary + } + // Tiebreak on most recently updated first + return ( + new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime() + ) + }) + } + return sorted + }, [groups, sortKey, sortDirection]) + + // Only render groups that have items (loaded or unloaded); empty groups are hidden + const visibleGroups = useMemo(() => { + return INBOX_GROUP_ORDER.filter( + (group) => groupedSessions[group].length > 0 || groups[group].hasMore + ) + }, [groupedSessions, groups]) + + const expandedGroups = visibleGroups.filter( + (group) => !collapsedGroups.includes(group) + ) + + const handleExpandedChange = (open: string[]) => { + setCollapsedGroups(visibleGroups.filter((group) => !open.includes(group))) + } + + return ( +
+ {/* Global sortable column header shared by all status groups */} +
+ + + + + + {showActions && } +
+ + + {visibleGroups.length === 0 && ( +
+ No agent runs +
+ )} + + {visibleGroups.map((groupKey) => { + const config = STATUS_GROUPS[groupKey] + const groupSessions = groupedSessions[groupKey] + const groupState = groups[groupKey] + const StatusIcon = config.icon + + return ( + + + + {/* Match SidebarTrigger dimensions (h-7 w-7) for alignment */} +
+ +
+
+ + + {config.label} + + + {groupSessions.length} + {groupState.hasMore ? "+" : ""} + +
+
+
+ + {groupSessions.map((session) => ( + + ))} + {groupState.hasMore && ( + + )} + +
+ ) + })} +
+
+ + { + if (!open) { + setConfirmDeleteId(null) + } + }} + > + + + Delete approval + + This will deny all pending tool calls and remove this approval + from the inbox. + + + + Cancel + { + if (confirmDeleteId) { + onDelete?.(confirmDeleteId) + } + setConfirmDeleteId(null) + }} + > + Delete + + + + +
+ ) +} diff --git a/frontend/src/components/sidebar/app-sidebar.tsx b/frontend/src/components/sidebar/app-sidebar.tsx index d45849329a..ecf19a3333 100644 --- a/frontend/src/components/sidebar/app-sidebar.tsx +++ b/frontend/src/components/sidebar/app-sidebar.tsx @@ -264,7 +264,7 @@ export function AppSidebar({ ...props }: React.ComponentProps) { visible: canViewWorkflows === true, }, { - title: "Approvals", + title: "Inbox", url: `${basePath}/inbox`, icon: ListChecksIcon, isActive: pathname?.startsWith(`${basePath}/inbox`), diff --git a/frontend/src/hooks/pagination/use-cursor-pagination.tsx b/frontend/src/hooks/pagination/use-cursor-pagination.tsx index 90e0f9dfc4..b42a1db435 100644 --- a/frontend/src/hooks/pagination/use-cursor-pagination.tsx +++ b/frontend/src/hooks/pagination/use-cursor-pagination.tsx @@ -1,6 +1,6 @@ "use client" -import { useQuery } from "@tanstack/react-query" +import { type Query, useQuery } from "@tanstack/react-query" import { useCallback, useState } from "react" import type { ApiError } from "@/client" @@ -39,8 +39,19 @@ export interface UseCursorPaginationOptions< enabled?: boolean staleTime?: number refetchOnWindowFocus?: boolean - refetchInterval?: number | false + refetchInterval?: + | number + | false + | ((query: Query, ApiError>) => number | false) refetchIntervalInBackground?: boolean + retry?: + | boolean + | number + | ((failureCount: number, error: ApiError) => boolean) + placeholderData?: ( + previousData: CursorPaginationResponse | undefined, + previousQuery: Query, ApiError> | undefined + ) => CursorPaginationResponse | undefined } export interface CursorPaginationState { @@ -66,6 +77,8 @@ export function useCursorPagination({ refetchOnWindowFocus, refetchInterval, refetchIntervalInBackground, + retry, + placeholderData, }: UseCursorPaginationOptions) { const [paginationState, setPaginationState] = useState( DEFAULT_PAGINATION_STATE @@ -113,6 +126,8 @@ export function useCursorPagination({ refetchOnWindowFocus, refetchInterval, refetchIntervalInBackground, + retry, + placeholderData, }) const goToNextPage = () => { diff --git a/frontend/src/hooks/use-inbox.ts b/frontend/src/hooks/use-inbox.ts index 74756368e7..f197cf81c7 100644 --- a/frontend/src/hooks/use-inbox.ts +++ b/frontend/src/hooks/use-inbox.ts @@ -1,20 +1,23 @@ "use client" import { - type Query, + type InfiniteData, + keepPreviousData, + useInfiniteQuery, useMutation, - useQuery, useQueryClient, } from "@tanstack/react-query" import { useCallback, useEffect, useMemo, useState } from "react" import { type AgentSessionEntity, approvalsDeleteApproval, + type InboxGroup, type InboxItemRead, type InboxItemStatus, type InboxListItemsResponse, inboxListItems, } from "@/client" +import { useDebounce } from "@/hooks/use-debounce" import { type AgentDerivedStatus, type AgentStatusTone, @@ -39,8 +42,26 @@ export interface UseInboxFilters { createdAfter: DateFilterValue } +/** Display order of inbox status groups, most urgent first. */ +export const INBOX_GROUP_ORDER: InboxGroup[] = [ + "review_required", + "running", + "error", + "completed", +] + +/** Paginated state of a single inbox status group. */ +export interface InboxGroupState { + sessions: InboxSessionItem[] + isLoading: boolean + hasMore: boolean + isLoadingMore: boolean + loadMore: () => void +} + export interface UseInboxResult { sessions: InboxSessionItem[] + groups: Record selectedId: string | null setSelectedId: (id: string | null) => void isLoading: boolean @@ -151,6 +172,7 @@ function inboxItemToSessionItem(item: InboxItemRead): InboxSessionItem { alias: item.workflow.alias, } : null, + created_by: item.created_by ?? null, // Status fields derived from inbox status ...statusInfo, @@ -177,154 +199,206 @@ function getDateFromFilter(filter: DateFilterValue): Date | null { } } -export function useInbox(options: UseInboxOptions = {}): UseInboxResult { - const { enabled = true, autoRefresh = true } = options - const workspaceId = useWorkspaceId() - const [selectedId, setSelectedId] = useState(null) - const [searchQuery, setSearchQuery] = useState("") - const [entityType, setEntityType] = useState( - "all" - ) - const [limit, setLimit] = useState(20) - const [updatedAfter, setUpdatedAfter] = useState(null) - const [createdAfter, setCreatedAfter] = useState(null) +interface InboxGroupQueryOptions { + workspaceId: string + group: InboxGroup + limit: number + search: string + enabled: boolean + autoRefresh: boolean + pollMs: number +} - /** - * Computes the refetch interval for inbox items based on current state. - * - * Returns `false` to disable polling when: - * - Auto-refresh is disabled - * - The browser tab is hidden - * - * Otherwise returns an interval in milliseconds: - * - 3000ms (3s): When there are pending approvals - * - 10000ms (10s): When no items exist or all are in terminal states - */ - const computeRefetchInterval = useCallback( - ( - query: Query< - InboxListItemsResponse, - TracecatApiError, - InboxListItemsResponse, - readonly unknown[] - > - ) => { +/** + * Infinite query over a single inbox status group. + * + * Pages are appended ("show more") rather than replaced, and polling pauses + * when auto-refresh is off or the tab is hidden. + */ +function useInboxGroupQuery({ + workspaceId, + group, + limit, + search, + enabled, + autoRefresh, + pollMs, +}: InboxGroupQueryOptions) { + return useInfiniteQuery< + InboxListItemsResponse, + TracecatApiError, + InfiniteData, + readonly unknown[], + string | null + >({ + queryKey: ["inbox-items", workspaceId, group, limit, search], + queryFn: ({ pageParam }) => + inboxListItems({ + workspaceId, + limit, + cursor: pageParam, + search: search || null, + group, + }), + initialPageParam: null, + getNextPageParam: (lastPage) => + lastPage.has_more && lastPage.next_cursor ? lastPage.next_cursor : null, + enabled, + retry: retryHandler, + refetchInterval: (query) => { if (!autoRefresh) { return false } - if ( typeof document !== "undefined" && document.visibilityState === "hidden" ) { return false } - - const data = query.state.data - - if (!data || data.items.length === 0) { - return 10000 - } - - const hasPendingApproval = data.items.some( - (item) => item.status === "pending" - ) - const hasRunningExecution = data.items.some( - (item) => - typeof item.metadata?.temporal_status === "string" && - (item.metadata.temporal_status === "RUNNING" || - item.metadata.temporal_status === "CONTINUED_AS_NEW") - ) - if (hasPendingApproval || hasRunningExecution) { - return 3000 + // Only poll when a single page is loaded; additional pages from "show + // more" would each be re-fetched on every tick, causing request fan-out. + const pageCount = query.state.data?.pages.length ?? 0 + if (pageCount > 1) { + return false } - - return 10000 + return pollMs }, - [autoRefresh] + placeholderData: keepPreviousData, + }) +} + +export function useInbox(options: UseInboxOptions = {}): UseInboxResult { + const { enabled = true, autoRefresh = true } = options + const workspaceId = useWorkspaceId() + const [selectedId, setSelectedId] = useState(null) + const [searchQuery, setSearchQuery] = useState("") + const [debouncedSearchQuery] = useDebounce(searchQuery, 300) + const normalizedSearchQuery = debouncedSearchQuery.trim() + const [entityType, setEntityType] = useState( + "all" ) + const [limit, setLimit] = useState(20) + const [updatedAfter, setUpdatedAfter] = useState(null) + const [createdAfter, setCreatedAfter] = useState(null) - // Fetch inbox items from the unified inbox endpoint - // This endpoint properly aggregates approval status from the backend - const { - data: sessions, - isLoading, - error, - refetch, - } = useQuery({ - queryKey: ["inbox-items", workspaceId, limit], - queryFn: () => - inboxListItems({ - workspaceId, - limit, - }), - select: (data) => { - // Convert inbox items to session format and sort by priority - const converted = data.items.map(inboxItemToSessionItem) - // Sort by status priority (pending approvals first), then by updated_at (most recent first) - return converted.sort((a, b) => { - if (a.statusPriority !== b.statusPriority) { - return a.statusPriority - b.statusPriority - } - return ( - new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime() - ) - }) - }, - enabled: enabled && Boolean(workspaceId), - retry: retryHandler, - refetchInterval: computeRefetchInterval, + const baseEnabled = enabled && Boolean(workspaceId) + + // One independently paginated query per status group. Urgent groups poll + // faster; terminal groups poll slowly. + const reviewRequiredQuery = useInboxGroupQuery({ + workspaceId, + group: "review_required", + limit, + search: normalizedSearchQuery, + enabled: baseEnabled, + autoRefresh, + pollMs: 3000, + }) + const runningQuery = useInboxGroupQuery({ + workspaceId, + group: "running", + limit, + search: normalizedSearchQuery, + enabled: baseEnabled, + autoRefresh, + pollMs: 3000, + }) + const errorQuery = useInboxGroupQuery({ + workspaceId, + group: "error", + limit, + search: normalizedSearchQuery, + enabled: baseEnabled, + autoRefresh, + pollMs: 10000, + }) + const completedQuery = useInboxGroupQuery({ + workspaceId, + group: "completed", + limit, + search: normalizedSearchQuery, + enabled: baseEnabled, + autoRefresh, + pollMs: 10000, }) - // Apply client-side filtering (search, entity type, date filters) - const filteredSessions = useMemo(() => { - if (!sessions) return [] - - const updatedAfterDate = getDateFromFilter(updatedAfter) - const createdAfterDate = getDateFromFilter(createdAfter) - const query = searchQuery.toLowerCase().trim() + // Client-side filtering (entity type, date filters) applied per group + const filterSession = useCallback( + (session: InboxSessionItem) => { + const updatedAfterDate = getDateFromFilter(updatedAfter) + const createdAfterDate = getDateFromFilter(createdAfter) - return sessions.filter((session) => { - // Entity type filter if (entityType !== "all" && session.entity_type !== entityType) { return false } - - // Search filter - if (query) { - const title = ( - session.parent_workflow?.alias || - session.parent_workflow?.title || - session.title || - "" - ).toLowerCase() - const entityId = (session.entity_id || "").toLowerCase() - if (!title.includes(query) && !entityId.includes(query)) { - return false - } + if (updatedAfterDate && new Date(session.updated_at) < updatedAfterDate) { + return false } - - // Updated after filter - if (updatedAfterDate) { - const sessionUpdated = new Date(session.updated_at) - if (sessionUpdated < updatedAfterDate) { - return false - } + if (createdAfterDate && new Date(session.created_at) < createdAfterDate) { + return false } + return true + }, + [entityType, updatedAfter, createdAfter] + ) - // Created after filter - if (createdAfterDate) { - const sessionCreated = new Date(session.created_at) - if (sessionCreated < createdAfterDate) { - return false - } + const groupQueries = { + review_required: reviewRequiredQuery, + running: runningQuery, + error: errorQuery, + completed: completedQuery, + } as const + + const groups = useMemo>(() => { + function toGroupState( + query: (typeof groupQueries)[InboxGroup] + ): InboxGroupState { + const items = query.data?.pages.flatMap((page) => page.items) ?? [] + return { + sessions: items.map(inboxItemToSessionItem).filter(filterSession), + isLoading: query.isLoading, + hasMore: query.hasNextPage, + isLoadingMore: query.isFetchingNextPage, + loadMore: () => { + void query.fetchNextPage() + }, } + } + return { + review_required: toGroupState(groupQueries.review_required), + running: toGroupState(groupQueries.running), + error: toGroupState(groupQueries.error), + completed: toGroupState(groupQueries.completed), + } + }, [ + groupQueries.review_required, + groupQueries.running, + groupQueries.error, + groupQueries.completed, + filterSession, + ]) + + const isLoading = INBOX_GROUP_ORDER.some( + (group) => groupQueries[group].isLoading + ) + const allErrors = INBOX_GROUP_ORDER.map( + (group) => groupQueries[group].error + ).filter(Boolean) + const error = + allErrors.length === INBOX_GROUP_ORDER.length + ? (allErrors[0] ?? null) + : null + const refetch = () => { + for (const group of INBOX_GROUP_ORDER) { + void groupQueries[group].refetch() + } + } - return true - }) - }, [sessions, searchQuery, entityType, updatedAfter, createdAfter]) - - const enrichedSessions = filteredSessions + // Flatten groups in display order for selection bookkeeping + const enrichedSessions = useMemo( + () => INBOX_GROUP_ORDER.flatMap((group) => groups[group].sessions), + [groups] + ) // Auto-select first session with pending approval, or clear stale selections useEffect(() => { @@ -350,6 +424,7 @@ export function useInbox(options: UseInboxOptions = {}): UseInboxResult { return { sessions: enrichedSessions, + groups, selectedId, setSelectedId, isLoading, diff --git a/frontend/src/lib/agents.ts b/frontend/src/lib/agents.ts index 53b539fc88..f04c652ae9 100644 --- a/frontend/src/lib/agents.ts +++ b/frontend/src/lib/agents.ts @@ -3,6 +3,7 @@ import type { AgentSessionRead, ApprovalRead, ChatReadMinimal, + UserSummary, WorkflowExecutionStatus, } from "@/client" import { undoSlugify } from "@/lib/utils" @@ -151,6 +152,7 @@ export interface InboxSessionItem { created_at: string updated_at: string parent_workflow: WorkflowSummary | null + created_by: UserSummary | null derivedStatus: AgentDerivedStatus statusLabel: string statusPriority: number diff --git a/packages/tracecat-ee/tracecat_ee/inbox/providers/approvals.py b/packages/tracecat-ee/tracecat_ee/inbox/providers/agent_runs.py similarity index 52% rename from packages/tracecat-ee/tracecat_ee/inbox/providers/approvals.py rename to packages/tracecat-ee/tracecat_ee/inbox/providers/agent_runs.py index 8823ae8001..fe7b1813fc 100644 --- a/packages/tracecat-ee/tracecat_ee/inbox/providers/approvals.py +++ b/packages/tracecat-ee/tracecat_ee/inbox/providers/agent_runs.py @@ -1,4 +1,4 @@ -"""Approvals inbox provider for workflow-initiated agent sessions.""" +"""Agent runs inbox provider for Claude Code agent sessions.""" from __future__ import annotations @@ -7,15 +7,16 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, Any, Literal -from sqlalchemy import and_, distinct, func, or_, select +from sqlalchemy import String, and_, cast, distinct, func, or_, select from temporalio.client import WorkflowExecutionStatus from tracecat.agent.approvals.enums import ApprovalStatus +from tracecat.agent.common.stream_types import HarnessType from tracecat.db.dependencies import AsyncDBSession -from tracecat.db.models import AgentSession, Approval, Workflow +from tracecat.db.models import AgentSession, Approval, User, Workflow from tracecat.dsl.client import get_temporal_client -from tracecat.inbox.schemas import InboxItemRead, WorkflowSummary -from tracecat.inbox.types import InboxItemStatus, InboxItemType +from tracecat.inbox.schemas import InboxItemRead, UserSummary, WorkflowSummary +from tracecat.inbox.types import InboxGroup, InboxItemStatus, InboxItemType from tracecat.logger import logger from tracecat.pagination import BaseCursorPaginator, CursorPaginatedResponse from tracecat_ee.agent.types import AgentWorkflowID @@ -30,14 +31,28 @@ WorkflowExecutionStatus.TERMINATED, } +# Approvals are a workflow concept: only automation-initiated sessions surface +# them in the inbox. Chat-surface approvals are handled inline in the chat UI. +APPROVAL_ENTITY_TYPES = ("workflow", "external_channel") + +# Group membership depends on live Temporal status, so grouped listing scans +# sessions in batches and classifies after enrichment. Each scanned session may +# cost a Temporal describe call, so the scan is hard-capped per request. +GROUP_SCAN_BATCH_SIZE = 50 +GROUP_SCAN_MAX_SESSIONS = 300 + +RUNNING_STATUS_NAMES = {s.name for s in RUNNING_STATUSES} +FAILED_STATUS_NAMES = {s.name for s in FAILED_STATUSES} + if TYPE_CHECKING: from tracecat.auth.types import Role -class ApprovalsInboxProvider(BaseCursorPaginator): - """Provides approval items for the inbox. +class AgentRunsInboxProvider(BaseCursorPaginator): + """Provides agent run items for the inbox. - Filters to workflow-initiated sessions only and enriches with workflow metadata. + Lists root Claude Code agent sessions (plus any legacy sessions with + approvals) and enriches them with approval and workflow metadata. """ def __init__(self, session: AsyncDBSession, role: Role): @@ -96,6 +111,50 @@ async def describe_status( statuses[session_id] = status return statuses + def _base_query(self, search: str | None): + """Base statement selecting inbox-eligible root sessions.""" + # Root sessions only: all Claude Code runs, plus legacy sessions that + # already have approvals so existing inbox items don't disappear. + has_approvals = ( + select(Approval.id).where(Approval.session_id == AgentSession.id).exists() + ) + base_stmt = select(AgentSession).where( + AgentSession.workspace_id == self.workspace_id, + AgentSession.parent_session_id.is_(None), + AgentSession.entity_type != "approval", + or_( + AgentSession.harness_type == HarnessType.CLAUDE_CODE, + and_( + has_approvals, + AgentSession.entity_type.in_(APPROVAL_ENTITY_TYPES), + ), + ), + ) + + if search: + like_term = f"%{search}%" + # Workflow-initiated sessions display the workflow alias/title in + # the inbox, so match those as well as the session title. + workflow_match = ( + select(Workflow.id) + .where( + Workflow.id == AgentSession.entity_id, + or_( + Workflow.title.ilike(like_term), + Workflow.alias.ilike(like_term), + ), + ) + .exists() + ) + base_stmt = base_stmt.where( + or_( + AgentSession.title.ilike(like_term), + cast(AgentSession.entity_id, String).ilike(like_term), + workflow_match, + ) + ) + return base_stmt + async def list_items( self, *, @@ -104,19 +163,21 @@ async def list_items( reverse: bool = False, order_by: str | None = None, sort: Literal["asc", "desc"] | None = None, + search: str | None = None, + group: InboxGroup | None = None, ) -> CursorPaginatedResponse[InboxItemRead]: - """List workflow approval items with cursor pagination.""" - # Base query for workflow-initiated sessions with approvals - base_stmt = ( - select(AgentSession) - .join(Approval, AgentSession.id == Approval.session_id) - .where( - AgentSession.workspace_id == self.workspace_id, - AgentSession.parent_session_id.is_(None), - AgentSession.entity_type.in_(["workflow", "external_channel"]), + """List agent run items with cursor pagination.""" + if group is not None: + return await self._list_items_grouped( + limit=limit, + cursor=cursor, + order_by=order_by, + sort=sort, + search=search, + group=group, ) - .distinct() - ) + + base_stmt = self._base_query(search) # Determine sort column and direction sort_col = order_by or "created_at" @@ -269,6 +330,174 @@ async def list_items( total_estimate=None, ) + @staticmethod + def _classify_item(item: InboxItemRead) -> InboxGroup: + """Classify an enriched inbox item into its display group. + + Must stay in sync with the status grouping in the inbox UI. + """ + metadata = item.metadata or {} + if metadata.get("pending_count"): + return InboxGroup.REVIEW_REQUIRED + temporal_status = metadata.get("temporal_status") + if temporal_status in RUNNING_STATUS_NAMES: + return InboxGroup.RUNNING + if temporal_status in FAILED_STATUS_NAMES: + return InboxGroup.ERROR + if item.status == InboxItemStatus.FAILED: + return InboxGroup.ERROR + return InboxGroup.COMPLETED + + async def _list_items_grouped( + self, + *, + limit: int, + cursor: str | None, + order_by: str | None, + sort: Literal["asc", "desc"] | None, + search: str | None, + group: InboxGroup, + ) -> CursorPaginatedResponse[InboxItemRead]: + """List items belonging to a single display group. + + Group membership requires enrichment (approval counts and live Temporal + status), so this scans sessions in keyset batches, classifies each + batch, and stops once enough matches are collected or the scan cap is + reached. + + The cursor encodes the scan position (created_at/updated_at + id of the + last scanned session), not the last returned item. This means "show + more" resumes the scan where it left off rather than restarting from the + top, so groups whose matching items sit beyond GROUP_SCAN_MAX_SESSIONS + candidates are still reachable. + """ + sort_col = "updated_at" if order_by == "updated_at" else "created_at" + sort_desc = sort != "asc" + column = ( + AgentSession.updated_at + if sort_col == "updated_at" + else AgentSession.created_at + ) + + base_stmt = self._base_query(search) + # Narrow the scan with SQL predicates where group membership implies one + if group is InboxGroup.REVIEW_REQUIRED: + pending_exists = ( + select(Approval.id) + .where( + Approval.session_id == AgentSession.id, + Approval.status == ApprovalStatus.PENDING, + ) + .exists() + ) + base_stmt = base_stmt.where( + pending_exists, + AgentSession.entity_type.in_(APPROVAL_ENTITY_TYPES), + ) + elif group is InboxGroup.RUNNING: + # Necessary (not sufficient) condition: a live Temporal run exists + base_stmt = base_stmt.where(AgentSession.curr_run_id.is_not(None)) + + # Decode cursor as a scan-position keyset (sort_value + id of last + # scanned session). This lets subsequent pages resume exactly where + # the previous scan stopped instead of restarting from the top. + last_key: tuple[Any, uuid.UUID] | None = None + if cursor: + try: + cursor_data = self.decode_cursor(cursor) + last_key = (cursor_data.sort_value, uuid.UUID(cursor_data.id)) + except (ValueError, KeyError) as e: + logger.warning("Invalid grouped inbox cursor", error=str(e)) + + matches: list[InboxItemRead] = [] + scanned = 0 + exhausted = False + + while len(matches) < limit + 1 and scanned < GROUP_SCAN_MAX_SESSIONS: + stmt = base_stmt + if last_key is not None: + last_value, last_id = last_key + if sort_desc: + stmt = stmt.where( + or_( + column < last_value, + and_(column == last_value, AgentSession.id < last_id), + ) + ) + else: + stmt = stmt.where( + or_( + column > last_value, + and_(column == last_value, AgentSession.id > last_id), + ) + ) + order_clause = column.desc() if sort_desc else column.asc() + id_order = AgentSession.id.desc() if sort_desc else AgentSession.id.asc() + stmt = stmt.order_by(order_clause, id_order).limit(GROUP_SCAN_BATCH_SIZE) + + result = await self.session.execute(stmt) + sessions = list(result.scalars().all()) + if not sessions: + exhausted = True + break + + scanned += len(sessions) + last_session = sessions[-1] + last_key = (getattr(last_session, sort_col), last_session.id) + + items = await self._enrich_sessions(sessions) + matches.extend(item for item in items if self._classify_item(item) == group) + + if len(sessions) < GROUP_SCAN_BATCH_SIZE: + exhausted = True + break + + page_items = matches[:limit] + has_more = len(matches) > limit or not exhausted + + # Two cases where we can paginate forward: + # 1. We collected more matches than the page size — encode a cursor from + # the last returned item so the next request skips past it. + # 2. We hit the scan cap before exhausting the base query — encode the + # scan position (last_key) so the next request resumes scanning there. + next_cursor: str | None = None + if has_more and last_key is not None: + if len(matches) > limit: + # Item-based cursor: skip past the last item on this page. + last_item = page_items[-1] + next_cursor = self.encode_cursor( + id=last_item.id, + sort_column=sort_col, + sort_value=getattr(last_item, sort_col), + ) + else: + # Scan-position cursor: resume scanning from where we stopped. + scan_value, scan_id = last_key + next_cursor = self.encode_cursor( + id=scan_id, + sort_column=sort_col, + sort_value=scan_value, + ) + + prev_cursor: str | None = None + if cursor and page_items: + # For backwards compat: encode position of first returned item + first = page_items[0] + prev_cursor = self.encode_cursor( + id=first.id, + sort_column=sort_col, + sort_value=getattr(first, sort_col), + ) + + return CursorPaginatedResponse( + items=page_items, + next_cursor=next_cursor, + prev_cursor=prev_cursor, + has_more=has_more, + has_previous=cursor is not None, + total_estimate=None, + ) + async def count_pending_items(self) -> int: """Count pending approval inbox items for sessions shown in the inbox.""" stmt = ( @@ -280,7 +509,7 @@ async def count_pending_items(self) -> int: Approval.status == ApprovalStatus.PENDING, AgentSession.workspace_id == self.workspace_id, AgentSession.parent_session_id.is_(None), - AgentSession.entity_type.in_(["workflow", "external_channel"]), + AgentSession.entity_type.in_(APPROVAL_ENTITY_TYPES), ) ) count = await self.session.scalar(stmt) @@ -294,23 +523,23 @@ async def _enrich_sessions( if not sessions: return [] - session_ids = [s.id for s in sessions] - - # Fetch approvals for these sessions - approval_stmt = select(Approval).where( - Approval.workspace_id == self.workspace_id, - Approval.session_id.in_(session_ids), - ) - approval_result = await self.session.execute(approval_stmt) - approvals = approval_result.scalars().all() - - # Group approvals by session + # Fetch approvals for automation-initiated sessions only; chat-surface + # approvals are resolved inline in the chat UI and never shown here. + approval_session_ids = [ + s.id for s in sessions if s.entity_type in APPROVAL_ENTITY_TYPES + ] approvals_by_session: dict[uuid.UUID, list[Approval]] = {} - for approval in approvals: - if approval.session_id: - approvals_by_session.setdefault(approval.session_id, []).append( - approval - ) + if approval_session_ids: + approval_stmt = select(Approval).where( + Approval.workspace_id == self.workspace_id, + Approval.session_id.in_(approval_session_ids), + ) + approval_result = await self.session.execute(approval_stmt) + for approval in approval_result.scalars().all(): + if approval.session_id: + approvals_by_session.setdefault(approval.session_id, []).append( + approval + ) # Fetch workflow metadata for sessions with entity_id workflow_ids = {s.entity_id for s in sessions if s.entity_id} @@ -323,6 +552,16 @@ async def _enrich_sessions( workflows = workflow_result.scalars().all() workflows_by_id = {w.id: w for w in workflows} + # Fetch creators for user-initiated sessions + creator_ids = {s.created_by for s in sessions if s.created_by} + users_by_id: dict[uuid.UUID, User] = {} + if creator_ids: + user_stmt = select(User).where( + User.id.in_(list(creator_ids)) # pyright: ignore[reportAttributeAccessIssue] + ) + user_result = await self.session.execute(user_stmt) + users_by_id = {u.id: u for u in user_result.scalars().all()} + # Transform to InboxItemRead items: list[InboxItemRead] = [] for session in sessions: @@ -356,9 +595,22 @@ async def _enrich_sessions( preview = "Execution failed" elif failed_count > 0: preview = f"{failed_count} rejected" + elif temporal_status is None and not session_approvals: + preview = "Agent session" else: preview = "Execution completed" + # Get creator info + created_by: UserSummary | None = None + if session.created_by and session.created_by in users_by_id: + user = users_by_id[session.created_by] + created_by = UserSummary( + id=user.id, + email=user.email, + first_name=user.first_name, + last_name=user.last_name, + ) + # Get workflow info workflow_summary: WorkflowSummary | None = None title = session.title or "Agent session" @@ -393,7 +645,11 @@ async def _enrich_sessions( items.append( InboxItemRead( id=session.id, - type=InboxItemType.APPROVAL, + type=( + InboxItemType.APPROVAL + if session_approvals + else InboxItemType.AGENT_RUN + ), title=title, preview=preview, status=status, @@ -401,6 +657,7 @@ async def _enrich_sessions( created_at=session.created_at, updated_at=session.updated_at, workflow=workflow_summary, + created_by=created_by, source_id=session.id, # Always use parent session ID source_type="agent_session", metadata=metadata, diff --git a/tracecat/inbox/dependencies.py b/tracecat/inbox/dependencies.py index 77b7ef2d99..bbebfb5fb6 100644 --- a/tracecat/inbox/dependencies.py +++ b/tracecat/inbox/dependencies.py @@ -24,13 +24,13 @@ def get_inbox_providers( """ providers: list[InboxProvider] = [] - # EE: Add approvals provider if available + # EE: Add agent runs provider if available try: - from tracecat_ee.inbox.providers.approvals import ApprovalsInboxProvider + from tracecat_ee.inbox.providers.agent_runs import AgentRunsInboxProvider - providers.append(ApprovalsInboxProvider(session, role)) - logger.debug("Loaded ApprovalsInboxProvider") + providers.append(AgentRunsInboxProvider(session, role)) + logger.debug("Loaded AgentRunsInboxProvider") except ImportError: - logger.debug("ApprovalsInboxProvider not available (EE feature)") + logger.debug("AgentRunsInboxProvider not available (EE feature)") return providers diff --git a/tracecat/inbox/router.py b/tracecat/inbox/router.py index 4d93b57fca..6e68a8e7b2 100644 --- a/tracecat/inbox/router.py +++ b/tracecat/inbox/router.py @@ -11,6 +11,7 @@ from tracecat.inbox.dependencies import get_inbox_providers from tracecat.inbox.schemas import InboxItemRead, InboxPendingCount from tracecat.inbox.service import InboxService +from tracecat.inbox.types import InboxGroup from tracecat.logger import logger from tracecat.pagination import CursorPaginatedResponse @@ -49,6 +50,15 @@ async def list_items( sort: Literal["asc", "desc"] | None = Query( default=None, description="Sort direction (asc or desc)" ), + search: str | None = Query( + default=None, + max_length=200, + description="Case-insensitive search on item title", + ), + group: InboxGroup | None = Query( + default=None, + description="Filter items to a single display group", + ), ) -> CursorPaginatedResponse[InboxItemRead]: """List inbox items with cursor-based pagination. @@ -58,6 +68,8 @@ async def list_items( providers = get_inbox_providers(session, role) service = InboxService(session, role, providers) + search = search.strip() or None if search else None + try: return await service.list_items( limit=limit, @@ -65,6 +77,8 @@ async def list_items( reverse=reverse, order_by=order_by, sort=sort, + search=search, + group=group, ) except ValueError as e: logger.warning(f"Invalid request for list inbox items: {e}") diff --git a/tracecat/inbox/schemas.py b/tracecat/inbox/schemas.py index 8520525950..0269ad2e71 100644 --- a/tracecat/inbox/schemas.py +++ b/tracecat/inbox/schemas.py @@ -19,6 +19,15 @@ class WorkflowSummary(BaseModel): alias: str | None = Field(default=None, description="Workflow alias") +class UserSummary(BaseModel): + """Summary of a user for inbox item context.""" + + id: uuid.UUID = Field(..., description="User ID") + email: str = Field(..., description="User email") + first_name: str | None = Field(default=None, description="User first name") + last_name: str | None = Field(default=None, description="User last name") + + class InboxItemRead(BaseModel): """Read model for inbox items.""" @@ -33,6 +42,10 @@ class InboxItemRead(BaseModel): workflow: WorkflowSummary | None = Field( default=None, description="Associated workflow" ) + created_by: UserSummary | None = Field( + default=None, + description="User who created the source entity (None for automation-initiated items)", + ) source_id: uuid.UUID = Field(..., description="ID of the source entity") source_type: str = Field( ..., description="Type of source entity (e.g., agent_session)" diff --git a/tracecat/inbox/service.py b/tracecat/inbox/service.py index ad6e3d1d28..0bdcf3eb72 100644 --- a/tracecat/inbox/service.py +++ b/tracecat/inbox/service.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Literal from tracecat.inbox.schemas import InboxItemRead -from tracecat.inbox.types import InboxItemStatus +from tracecat.inbox.types import InboxGroup, InboxItemStatus from tracecat.pagination import BaseCursorPaginator, CursorPaginatedResponse from tracecat.service import BaseWorkspaceService @@ -39,18 +39,63 @@ async def list_items( reverse: bool = False, order_by: str | None = None, sort: Literal["asc", "desc"] | None = None, + search: str | None = None, + group: InboxGroup | None = None, ) -> CursorPaginatedResponse[InboxItemRead]: """List inbox items with cursor-based pagination. - For the initial implementation, this delegates pagination to providers - and merges results. Future optimization could use a materialized view - or unified table for more efficient cross-provider pagination. + With a single provider, delegates pagination directly so the provider's + SQL keyset pagination is used end-to-end. With multiple providers, + falls back to in-memory aggregation. """ - # For now, use simple aggregation with in-memory pagination - # This works well for small-medium inbox sizes - # TODO: Optimize for large inboxes with cross-provider cursor pagination + if len(self.providers) == 1: + return await self.providers[0].list_items( + limit=limit, + cursor=cursor, + reverse=reverse, + order_by=order_by, + sort=sort, + search=search, + group=group, + ) + + # Multi-provider: aggregate in memory. + # Grouped queries pass cursor straight through; each provider owns its + # own scan-position cursors and returns the correct page directly. + if group is not None: + all_items: list[InboxItemRead] = [] + next_cursor: str | None = None + prev_cursor: str | None = None + has_more = False + has_previous = False + for provider in self.providers: + provider_response = await provider.list_items( + limit=limit, + cursor=cursor, + reverse=reverse, + order_by=order_by, + sort=sort, + search=search, + group=group, + ) + all_items.extend(provider_response.items) + if provider_response.has_more: + has_more = True + next_cursor = provider_response.next_cursor + if provider_response.has_previous: + has_previous = True + prev_cursor = provider_response.prev_cursor + return CursorPaginatedResponse( + items=all_items, + next_cursor=next_cursor, + prev_cursor=prev_cursor, + has_more=has_more, + has_previous=has_previous, + total_estimate=None, + ) - all_items: list[InboxItemRead] = [] + all_items_ungrouped: list[InboxItemRead] = [] + providers_have_more = False # Decode cursor to get target item ID if present cursor_id: str | None = None @@ -58,40 +103,38 @@ async def list_items( cursor_data = self.decode_cursor(cursor) cursor_id = cursor_data.id - # Initial fetch - get enough items to likely include cursor position. - # For cross-provider aggregation, we fetch a larger window to increase - # the likelihood of including the cursor item. This is a known limitation - # of in-memory aggregation; very large inboxes may need a unified table. - # Fetch more when we have a cursor to search for. + # Fetch a larger window per provider so the cursor item is likely within + # the fetched set after merging. Known limitation: very large inboxes + # should use a unified table instead. initial_fetch_limit = limit * 10 if cursor_id else limit * 2 for provider in self.providers: - # Fetch items without cursor - handle pagination at aggregate level provider_response = await provider.list_items( limit=initial_fetch_limit, cursor=None, reverse=reverse, order_by=order_by, sort=sort, + search=search, + group=None, ) - all_items.extend(provider_response.items) + all_items_ungrouped.extend(provider_response.items) + providers_have_more = providers_have_more or provider_response.has_more - # Sort all items + # Sort merged results sort_desc = sort != "asc" if order_by == "created_at" or order_by is None: - all_items.sort( + all_items_ungrouped.sort( key=lambda x: x.created_at.timestamp(), reverse=sort_desc, ) elif order_by == "updated_at": - all_items.sort( + all_items_ungrouped.sort( key=lambda x: x.updated_at.timestamp(), reverse=sort_desc, ) elif order_by == "status": - # Status priority: pending first (asc) or completed first (desc) - # Secondary sort by created_at in the same direction - all_items.sort( + all_items_ungrouped.sort( key=lambda x: ( x.status != InboxItemStatus.PENDING, x.created_at.timestamp() @@ -104,23 +147,29 @@ async def list_items( # Find cursor position in merged results start_idx = 0 if cursor_id: - for i, item in enumerate(all_items): - if item.id == cursor_id: + for i, item in enumerate(all_items_ungrouped): + if str(item.id) == cursor_id: start_idx = i + 1 if not reverse else i break - # Slice items + # Slice + end_idx = start_idx if reverse: - end_idx = start_idx start_idx = max(0, end_idx - limit) - items = all_items[start_idx:end_idx] + items = all_items_ungrouped[start_idx:end_idx] items.reverse() else: - items = all_items[start_idx : start_idx + limit] + items = all_items_ungrouped[start_idx : start_idx + limit] # Determine pagination state - has_more = start_idx + limit < len(all_items) - has_previous = start_idx > 0 + if reverse: + has_more = start_idx > 0 + has_previous = end_idx < len(all_items_ungrouped) or providers_have_more + else: + has_more = ( + start_idx + limit < len(all_items_ungrouped) or providers_have_more + ) + has_previous = start_idx > 0 # Generate cursors next_cursor = None @@ -148,7 +197,7 @@ async def list_items( prev_cursor=prev_cursor, has_more=has_more, has_previous=has_previous, - total_estimate=len(all_items), + total_estimate=len(all_items_ungrouped), ) async def count_pending_items(self) -> int: diff --git a/tracecat/inbox/types.py b/tracecat/inbox/types.py index 872bd984ab..b9b7be58c9 100644 --- a/tracecat/inbox/types.py +++ b/tracecat/inbox/types.py @@ -14,6 +14,7 @@ class InboxItemType(StrEnum): """Types of inbox items.""" APPROVAL = "approval" + AGENT_RUN = "agent_run" # Future types: # MENTION = "mention" # ASSIGNMENT = "assignment" @@ -27,6 +28,19 @@ class InboxItemStatus(StrEnum): FAILED = "failed" +class InboxGroup(StrEnum): + """Display groups for inbox items. + + Groups are derived from approval state and live workflow execution status, + so membership cannot be expressed as a pure SQL filter. + """ + + REVIEW_REQUIRED = "review_required" + RUNNING = "running" + ERROR = "error" + COMPLETED = "completed" + + class InboxProvider(Protocol): """Protocol for inbox item providers. @@ -43,6 +57,8 @@ async def list_items( reverse: bool = False, order_by: str | None = None, sort: Literal["asc", "desc"] | None = None, + search: str | None = None, + group: InboxGroup | None = None, ) -> CursorPaginatedResponse[InboxItemRead]: """List inbox items with cursor-based pagination.""" ...