From 0c084e7be2cd22c7988cc7cf2bfd030484e281b9 Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 17:13:58 +0800 Subject: [PATCH 1/6] fix(sync): close the sync cycle with per-resource pull reconciliation - Pull now blocks only on per-resource conflicts (local AND remote changed with differing specs) instead of any local drift, and rebaselines convergent resources so export -> merge -> pull ends clean (ENG-1472) - Track resource hashes in two spaces: last_synced_spec_hash (repo spec) and last_projected_spec_hash (local projection), recorded from a post-reconcile re-projection that doubles as the P(W_next) == S postcondition check (ENG-1475) - Stop advancing mapping sync metadata on branch export; it only moves on pull when the base advances (ENG-1473) - Apply mapping-style source id dedup in read-only projections so duplicate slugs cannot shadow workflows (ENG-1474) - Drop the unwritten WorkspaceSyncEvent provenance table from the v1 migration (journal ships post-v1, ENG-1477) and fold rendered_files into the base migration; reparent onto 9b52f7f18a31 --- ...4e2a1c9d8_add_workspace_git_sync_tables.py | 56 +-- ...ered_files_to_workspace_sync_changesets.py | 36 -- tests/unit/test_workspace_sync.py | 393 +++++++++++++++++- tracecat/db/models.py | 40 +- tracecat/db/tenant_rls.py | 1 - tracecat/workspace_sync/service.py | 216 ++++++++-- 6 files changed, 573 insertions(+), 169 deletions(-) delete mode 100644 alembic/versions/b3f7a92d1c4e_add_rendered_files_to_workspace_sync_changesets.py diff --git a/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py b/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py index 2a685dfb7..1e7ef46aa 100644 --- a/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py +++ b/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py @@ -28,7 +28,6 @@ WORKSPACE_SYNC_TABLES = ( "workspace_sync_state", "workspace_sync_resource_mapping", - "workspace_sync_event", "workspace_sync_changeset", "workspace_sync_changeset_item", "workspace_sync_materialization", @@ -142,6 +141,7 @@ def upgrade() -> None: sa.Column("local_id", postgresql.UUID(as_uuid=True), nullable=False), sa.Column("last_synced_commit_sha", sa.String(), nullable=True), sa.Column("last_synced_spec_hash", sa.String(), nullable=True), + sa.Column("last_projected_spec_hash", sa.String(), nullable=True), sa.Column( "sync_status", sa.String(length=32), @@ -182,53 +182,6 @@ def upgrade() -> None: unique=False, ) - op.create_table( - "workspace_sync_event", - *_record_columns(), - *_tenant_columns(), - sa.Column( - "provider", sa.String(length=32), server_default="git", nullable=False - ), - sa.Column("resource_type", sa.String(length=64), nullable=False), - sa.Column("source_id", sa.String(), nullable=True), - sa.Column("local_id", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("operation", sa.String(length=32), nullable=False), - sa.Column("actor_id", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("base_commit_sha", sa.String(), nullable=True), - sa.Column("before_spec_hash", sa.String(), nullable=True), - sa.Column("after_spec_hash", sa.String(), nullable=True), - sa.Column( - "affected_paths", - postgresql.JSONB(astext_type=sa.Text()), - server_default=sa.text("'[]'::jsonb"), - nullable=False, - ), - sa.Column( - "metadata", - postgresql.JSONB(astext_type=sa.Text()), - server_default=sa.text("'{}'::jsonb"), - nullable=False, - ), - sa.Column("superseded_by", postgresql.UUID(as_uuid=True), nullable=True), - sa.Column("changeset_id", postgresql.UUID(as_uuid=True), nullable=True), - *_timestamps(), - *_tenant_fks("workspace_sync_event"), - sa.PrimaryKeyConstraint("surrogate_id", name=op.f("pk_workspace_sync_event")), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_event_id")), - ) - op.create_index( - op.f("ix_workspace_sync_event_id"), - "workspace_sync_event", - ["id"], - unique=True, - ) - op.create_index( - op.f("ix_workspace_sync_event_workspace_id"), - "workspace_sync_event", - ["workspace_id"], - unique=False, - ) - op.create_table( "workspace_sync_changeset", *_record_columns(), @@ -252,6 +205,12 @@ def upgrade() -> None: server_default=sa.text("'[]'::jsonb"), nullable=False, ), + sa.Column( + "rendered_files", + postgresql.JSONB(astext_type=sa.Text()), + server_default=sa.text("'{}'::jsonb"), + nullable=False, + ), sa.Column( "validation_status", sa.String(length=32), @@ -415,6 +374,5 @@ def downgrade() -> None: op.drop_table("workspace_sync_materialization") op.drop_table("workspace_sync_changeset_item") op.drop_table("workspace_sync_changeset") - op.drop_table("workspace_sync_event") op.drop_table("workspace_sync_resource_mapping") op.drop_table("workspace_sync_state") diff --git a/alembic/versions/b3f7a92d1c4e_add_rendered_files_to_workspace_sync_changesets.py b/alembic/versions/b3f7a92d1c4e_add_rendered_files_to_workspace_sync_changesets.py deleted file mode 100644 index c8540af71..000000000 --- a/alembic/versions/b3f7a92d1c4e_add_rendered_files_to_workspace_sync_changesets.py +++ /dev/null @@ -1,36 +0,0 @@ -"""add rendered files to workspace sync changesets - -Revision ID: b3f7a92d1c4e -Revises: 25f4e2a1c9d8 -Create Date: 2026-06-07 00:00:00.000000 - -""" - -from collections.abc import Sequence - -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -from alembic import op - -# revision identifiers, used by Alembic. -revision: str = "b3f7a92d1c4e" -down_revision: str | None = "25f4e2a1c9d8" -branch_labels: str | Sequence[str] | None = None -depends_on: str | Sequence[str] | None = None - - -def upgrade() -> None: - op.add_column( - "workspace_sync_changeset", - sa.Column( - "rendered_files", - postgresql.JSONB(astext_type=sa.Text()), - server_default=sa.text("'{}'::jsonb"), - nullable=False, - ), - ) - - -def downgrade() -> None: - op.drop_column("workspace_sync_changeset", "rendered_files") diff --git a/tests/unit/test_workspace_sync.py b/tests/unit/test_workspace_sync.py index 1610ac160..8945d341b 100644 --- a/tests/unit/test_workspace_sync.py +++ b/tests/unit/test_workspace_sync.py @@ -24,9 +24,10 @@ ) from tracecat.dsl.common import DSLEntrypoint, DSLInput from tracecat.dsl.schemas import ActionStatement +from tracecat.git.utils import parse_git_url from tracecat.identifiers.workflow import WorkflowUUID from tracecat.registry.lock.types import RegistryLock -from tracecat.sync import CommitInfo, PushStatus +from tracecat.sync import CommitInfo, PullOptions, PushStatus from tracecat.workflow.management.definitions import WorkflowDefinitionsService from tracecat.workflow.management.management import WorkflowsManagementService from tracecat.workflow.store.schemas import RemoteWorkflowDefinition @@ -92,7 +93,7 @@ def test_workflow_spec_does_not_serialize_local_uuid(sample_dsl: DSLInput) -> No folder=None, schedules=[], webhook=SimpleNamespace( - methods=["POST"], status="online", include_headers=False + methods=["POST"], status="online", include_headers=True ), case_trigger=SimpleNamespace( status="offline", @@ -111,6 +112,10 @@ def test_workflow_spec_does_not_serialize_local_uuid(sample_dsl: DSLInput) -> No assert "detect-okta-risk" in content assert str(local_id) not in content assert "wf_" not in content + # include_headers must survive a projection round-trip + assert spec.webhook is not None + assert spec.webhook.include_headers is True + assert "include_headers: true" in content def test_workflow_spec_includes_configured_case_trigger(sample_dsl: DSLInput) -> None: @@ -236,7 +241,7 @@ async def _create_local_workflow( session: AsyncSession, role: Role, dsl: DSLInput, - alias: str, + alias: str | None, ) -> Workflow: with patch( "tracecat.workflow.management.management.RegistryLockService.resolve_lock_with_bindings", @@ -389,3 +394,385 @@ async def test_status_pending_changeset_and_export_with_mocked_github( ] ) assert written_workflow["definition"]["title"] == "Detect Okta Risk" + + +TEST_REPO_URL = "git+ssh://git@github.com/test-org/test-repo.git" + + +def _registry_lock_patch(): + return patch( + "tracecat.workflow.management.management.RegistryLockService.resolve_lock_with_bindings", + new=AsyncMock( + return_value=RegistryLock( + origins={"tracecat_registry": "test"}, + actions={"core.transform.passthrough": "tracecat_registry"}, + ) + ), + ) + + +def _reset_fake_transport(files: dict[str, str] | None = None) -> None: + FakeGitHubSyncTransport.files = files or {} + FakeGitHubSyncTransport.written_files = None + FakeGitHubSyncTransport.written_branch = None + FakeGitHubSyncTransport.written_create_pr = None + + +async def _mutate_workflow_definition( + session: AsyncSession, + *, + workspace_id: uuid.UUID | None, + alias: str, + **content_updates: object, +) -> None: + definition = await session.scalar( + select(WorkflowDefinition).where( + WorkflowDefinition.workspace_id == workspace_id, + WorkflowDefinition.alias == alias, + ) + ) + assert definition is not None + definition.content = {**definition.content, **content_updates} + session.add(definition) + await session.commit() + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_full_sync_cycle_rebaselines_after_merge( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + """Edit -> changeset -> export -> merge -> pull must end CLEAN, repeatedly.""" + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, + role=svc_role, + dsl=sample_dsl, + alias="detect-okta-risk", + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + + changeset = await service.create_changeset( + params=ChangeSetCreate( + title="Export workflow", + resources=[ + ResourceRef( + resource_type="workflow", + source_id="detect-okta-risk", + ) + ], + ) + ) + await service.export_changeset( + changeset_id=changeset.id, + params=ChangeSetExport( + message="Export workflow", + branch="sync/detect-okta-risk", + create_pr=False, + ), + ) + + # Export lands on a branch, not the base: the change must stay pending. + pending = await service.list_pending_changes() + assert len(pending.changes) == 1 + mapping = await session.scalar( + select(WorkspaceSyncResourceMapping).where( + WorkspaceSyncResourceMapping.workspace_id == svc_role.workspace_id + ) + ) + assert mapping is not None + assert mapping.last_synced_spec_hash is None + assert mapping.last_projected_spec_hash is None + + # Simulate merging the export PR into the base branch. + assert FakeGitHubSyncTransport.written_files is not None + FakeGitHubSyncTransport.files = dict(FakeGitHubSyncTransport.written_files) + + result = await service.pull(url=url, options=PullOptions(commit_sha="a" * 40)) + assert result.success, result.message + # Local and remote already agree: rebaseline without re-importing. + assert result.workflows_imported == 0 + + status = await service.get_status() + assert status.status == "clean" + assert status.pending_change_count == 0 + assert (await service.list_pending_changes()).changes == [] + + # Second cycle: local edit shows as pending update, then syncs clean. + await _mutate_workflow_definition( + session, + workspace_id=svc_role.workspace_id, + alias="detect-okta-risk", + title="Detect Okta Risk v2", + ) + pending = await service.list_pending_changes() + assert len(pending.changes) == 1 + assert pending.changes[0].operation == "update" + status = await service.get_status() + assert status.status == "local_dirty" + + changeset = await service.create_changeset( + params=ChangeSetCreate( + title="Export v2", + resources=[ + ResourceRef( + resource_type="workflow", + source_id="detect-okta-risk", + ) + ], + ) + ) + await service.export_changeset( + changeset_id=changeset.id, + params=ChangeSetExport( + message="Export v2", + branch="sync/detect-okta-risk-v2", + create_pr=False, + ), + ) + assert FakeGitHubSyncTransport.written_files is not None + FakeGitHubSyncTransport.files = dict(FakeGitHubSyncTransport.written_files) + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + status = await service.get_status() + assert status.status == "clean" + assert status.pending_change_count == 0 + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_pull_imports_remote_only_changes_and_stays_clean( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, + role=svc_role, + dsl=sample_dsl, + alias="detect-okta-risk", + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + changeset = await service.create_changeset( + params=ChangeSetCreate( + title="Export workflow", + resources=[ + ResourceRef( + resource_type="workflow", + source_id="detect-okta-risk", + ) + ], + ) + ) + await service.export_changeset( + changeset_id=changeset.id, + params=ChangeSetExport( + message="Export workflow", + branch="sync/detect-okta-risk", + create_pr=False, + ), + ) + assert FakeGitHubSyncTransport.written_files is not None + FakeGitHubSyncTransport.files = dict(FakeGitHubSyncTransport.written_files) + result = await service.pull(url=url, options=PullOptions(commit_sha="a" * 40)) + assert result.success, result.message + + # Someone edits the workflow directly in the repository. + path = "workflows/detect-okta-risk/definition.yml" + remote = yaml.safe_load(FakeGitHubSyncTransport.files[path]) + remote["definition"]["description"] = "Edited in GitHub" + FakeGitHubSyncTransport.files = { + **FakeGitHubSyncTransport.files, + path: yaml.safe_dump(remote, sort_keys=False), + } + + status = await service.get_status() + assert status.status == "remote_ahead" + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + assert result.workflows_imported == 1 + + status = await service.get_status() + assert status.status == "clean" + assert status.pending_change_count == 0 + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_pull_blocks_on_conflicting_local_and_remote_changes( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, + role=svc_role, + dsl=sample_dsl, + alias="detect-okta-risk", + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + changeset = await service.create_changeset( + params=ChangeSetCreate( + title="Export workflow", + resources=[ + ResourceRef( + resource_type="workflow", + source_id="detect-okta-risk", + ) + ], + ) + ) + await service.export_changeset( + changeset_id=changeset.id, + params=ChangeSetExport( + message="Export workflow", + branch="sync/detect-okta-risk", + create_pr=False, + ), + ) + assert FakeGitHubSyncTransport.written_files is not None + FakeGitHubSyncTransport.files = dict(FakeGitHubSyncTransport.written_files) + result = await service.pull(url=url, options=PullOptions(commit_sha="a" * 40)) + assert result.success, result.message + + # Diverge: local title edit and a different remote description edit. + await _mutate_workflow_definition( + session, + workspace_id=svc_role.workspace_id, + alias="detect-okta-risk", + title="Detect Okta Risk v2", + ) + path = "workflows/detect-okta-risk/definition.yml" + remote = yaml.safe_load(FakeGitHubSyncTransport.files[path]) + remote["definition"]["description"] = "Edited in GitHub" + FakeGitHubSyncTransport.files = { + **FakeGitHubSyncTransport.files, + path: yaml.safe_dump(remote, sort_keys=False), + } + + status = await service.get_status() + assert status.status == "conflicted" + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert not result.success + assert result.diagnostics + assert result.diagnostics[0].error_type == "conflict" + assert ( + result.diagnostics[0].workflow_path + == "workflows/detect-okta-risk/definition.yml" + ) + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_pull_handwritten_spec_without_webhook_stays_clean( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + """A repo file omitting optional keys must not leave the workspace dirty.""" + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await session.commit() + _reset_fake_transport( + { + "workflows/remote-only/definition.yml": yaml.safe_dump( + { + "version": 1, + "type": "workflow", + "id": "remote-only", + "alias": "remote-only", + "definition": sample_dsl.model_dump(mode="json", exclude_none=True), + }, + sort_keys=False, + ) + } + ) + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + result = await service.pull(url=url, options=PullOptions(commit_sha="a" * 40)) + assert result.success, result.message + assert result.workflows_imported == 1 + + status = await service.get_status() + assert status.status == "clean" + assert status.pending_change_count == 0 + assert (await service.list_pending_changes()).changes == [] + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_readonly_projection_dedupes_duplicate_slugs( + session: AsyncSession, + svc_role: Role, + sample_dsl: DSLInput, +) -> None: + """Read-only and mapping-creating projections must assign identical source ids.""" + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias=None + ) + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias=None + ) + service = WorkspaceGitSyncService(session=session, role=svc_role) + + readonly = await service.project_workspace(create_missing_mappings=False) + assert set(readonly.spec.workflows) == { + "detect-okta-risk", + "detect-okta-risk-2", + } + + creating = await service.project_workspace(create_missing_mappings=True) + assert set(creating.spec.workflows) == set(readonly.spec.workflows) + assert creating.spec_hash == readonly.spec_hash diff --git a/tracecat/db/models.py b/tracecat/db/models.py index 58a031c4f..4ad5d8c88 100644 --- a/tracecat/db/models.py +++ b/tracecat/db/models.py @@ -600,6 +600,7 @@ class WorkspaceSyncResourceMapping(RecordModel): local_id: Mapped[uuid.UUID] = mapped_column(UUID, nullable=False) last_synced_commit_sha: Mapped[str | None] = mapped_column(String, nullable=True) last_synced_spec_hash: Mapped[str | None] = mapped_column(String, nullable=True) + last_projected_spec_hash: Mapped[str | None] = mapped_column(String, nullable=True) sync_status: Mapped[str] = mapped_column( String(32), default="untracked", @@ -608,45 +609,6 @@ class WorkspaceSyncResourceMapping(RecordModel): ) -class WorkspaceSyncEvent(RecordModel): - """Append-only provenance for Git-relevant workspace mutations.""" - - __tablename__ = "workspace_sync_event" - - id: Mapped[uuid.UUID] = mapped_column( - UUID, default=uuid.uuid4, nullable=False, unique=True, index=True - ) - workspace_id: Mapped[WorkspaceID] = mapped_column( - UUID, - ForeignKey("workspace.id", ondelete="CASCADE"), - nullable=False, - index=True, - ) - provider: Mapped[str] = mapped_column( - String(32), default="git", server_default=text("'git'"), nullable=False - ) - resource_type: Mapped[str] = mapped_column(String(64), nullable=False) - source_id: Mapped[str | None] = mapped_column(String, nullable=True) - local_id: Mapped[uuid.UUID | None] = mapped_column(UUID, nullable=True) - operation: Mapped[str] = mapped_column(String(32), nullable=False) - actor_id: Mapped[uuid.UUID | None] = mapped_column(UUID, nullable=True) - base_commit_sha: Mapped[str | None] = mapped_column(String, nullable=True) - before_spec_hash: Mapped[str | None] = mapped_column(String, nullable=True) - after_spec_hash: Mapped[str | None] = mapped_column(String, nullable=True) - affected_paths: Mapped[list[str]] = mapped_column( - JSONB, default=list, server_default=text("'[]'::jsonb"), nullable=False - ) - metadata_: Mapped[dict[str, Any]] = mapped_column( - "metadata", - JSONB, - default=dict, - server_default=text("'{}'::jsonb"), - nullable=False, - ) - superseded_by: Mapped[uuid.UUID | None] = mapped_column(UUID, nullable=True) - changeset_id: Mapped[uuid.UUID | None] = mapped_column(UUID, nullable=True) - - class WorkspaceSyncChangeSet(RecordModel): """Tracecat-side review and export unit for syncable workspace changes.""" diff --git a/tracecat/db/tenant_rls.py b/tracecat/db/tenant_rls.py index b1ae913ce..7205ccc22 100644 --- a/tracecat/db/tenant_rls.py +++ b/tracecat/db/tenant_rls.py @@ -86,7 +86,6 @@ "agent_preset_version_skill", "workspace_sync_state", "workspace_sync_resource_mapping", - "workspace_sync_event", "workspace_sync_changeset", "workspace_sync_changeset_item", "workspace_sync_materialization", diff --git a/tracecat/workspace_sync/service.py b/tracecat/workspace_sync/service.py index 75922e25c..1efc24831 100644 --- a/tracecat/workspace_sync/service.py +++ b/tracecat/workspace_sync/service.py @@ -115,7 +115,16 @@ async def project_workspace( create=create_missing_mappings, reserved_source_ids=set(specs), ) - source_id = mapping.source_id if mapping else preferred_source_id + if mapping is not None: + source_id = mapping.source_id + else: + # Read-only projection: apply the same dedup as mapping creation + # so both paths assign identical source ids. + source_id = await self._unique_source_id( + resource_type=SyncResourceType.WORKFLOW.value, + preferred_source_id=preferred_source_id, + reserved_source_ids=set(specs), + ) spec = workflow_spec_from_orm(workflow, dsl=dsl, source_id=source_id) specs[source_id] = spec @@ -239,7 +248,19 @@ async def pull( local_projection = await self.project_workspace(create_missing_mappings=True) state = await self._get_or_create_state(url=url) - if state.base_spec_hash and local_projection.spec_hash != state.base_spec_hash: + pending = await self._pending_changes_from_projection( + projection=local_projection, + state=state, + ) + local_changed = {change.source_id for change in pending.changes} + remote_changed = await self._remote_changed_source_ids(snapshot) + convergent = self._convergent_source_ids( + local_spec=local_projection.spec, + remote_spec=snapshot.spec, + candidates=local_changed & remote_changed, + ) + conflicts = sorted((local_changed & remote_changed) - convergent) + if conflicts: return PullResult( success=False, commit_sha=snapshot.commit_sha, @@ -247,33 +268,56 @@ async def pull( workflows_imported=0, diagnostics=[ PullDiagnostic( - workflow_path="", + workflow_path=workflow_source_path(source_id), workflow_title=None, error_type="conflict", message=( - "Local syncable workspace state changed since the last " - "synced base. Export or discard local changes before pulling." + "Resource changed both locally and in the repository " + "since the last sync. Export or discard the local " + "change before pulling." ), - details={ - "base_spec_hash": state.base_spec_hash, - "local_spec_hash": local_projection.spec_hash, - }, + details={"source_id": source_id}, ) + for source_id in conflicts ], - message="Pull blocked by local workspace drift", + message=f"Pull blocked by {len(conflicts)} conflicting resource(s)", ) + spec_to_apply = WorkspaceSpec( + workflows={ + source_id: workflow_spec + for source_id, workflow_spec in snapshot.spec.workflows.items() + if source_id in remote_changed and source_id not in convergent + } + ) result = await self._reconcile_workflow_specs( - spec=snapshot.spec, + spec=spec_to_apply, commit_sha=snapshot.commit_sha, ) - if result.success: - await self._record_successful_pull( - state=state, - snapshot=snapshot, - url=url, - ) - return result + if not result.success: + return result + + await self._rebaseline_convergent_mappings( + source_ids=convergent, + local_spec=local_projection.spec, + commit_sha=snapshot.commit_sha, + ) + await self._record_successful_pull( + state=state, + snapshot=snapshot, + url=url, + ) + return PullResult( + success=True, + commit_sha=snapshot.commit_sha, + workflows_found=len(snapshot.spec.workflows), + workflows_imported=result.workflows_imported, + diagnostics=[], + message=( + f"Imported {result.workflows_imported} changed workflow(s); " + f"{len(snapshot.spec.workflows) - result.workflows_imported} already up to date" + ), + ) async def get_status(self) -> WorkspaceSyncStatus: """Return the workspace/repo three-way sync status for the configured repo.""" @@ -304,16 +348,19 @@ async def get_status(self) -> WorkspaceSyncStatus: projection=local_projection, state=state, ) - remote_changed_source_ids = await self._remote_changed_source_ids( - remote_snapshot + local_changed = {change.source_id for change in pending.changes} + remote_changed = await self._remote_changed_source_ids(remote_snapshot) + convergent = self._convergent_source_ids( + local_spec=local_projection.spec, + remote_spec=remote_snapshot.spec if remote_snapshot else None, + candidates=local_changed & remote_changed, ) remote_spec_hash = remote_snapshot.spec_hash if remote_snapshot else None status = self._classify_status( - base_spec_hash=state.base_spec_hash, - local_spec_hash=local_projection.spec_hash, - remote_spec_hash=remote_spec_hash, - local_changed_source_ids={change.source_id for change in pending.changes}, - remote_changed_source_ids=remote_changed_source_ids, + has_base=state.base_commit_sha is not None, + has_remote=remote_snapshot is not None, + local_changed_source_ids=local_changed - convergent, + remote_changed_source_ids=remote_changed, remote_diagnostics=diagnostics, ) @@ -488,12 +535,10 @@ async def export_workflow( ), ) changeset.status = ChangeSetStatus.EXPORTED.value - mapping.source_path = workflow_source_path(workflow_spec.id) - mapping.last_synced_commit_sha = commit.sha - mapping.last_synced_spec_hash = stable_hash(workflow_spec) - mapping.sync_status = ResourceSyncStatus.SYNCED.value + # Mapping sync metadata only advances on pull: the export landed on a + # branch, not the synced base, so the resource is still pending. workflow.git_sync_branch = commit.ref - self.session.add_all([materialization, changeset, mapping, workflow]) + self.session.add_all([materialization, changeset, workflow]) await self.session.commit() return WorkspaceSyncExportResult(changeset_id=changeset.id, commit=commit) @@ -677,9 +722,74 @@ async def _reconcile_workflow_specs( commit_sha=commit_sha, spec_hash=stable_hash(workflow_spec), ) + await self.session.flush() + await self._record_projected_hashes(workflow_ids=list(local_ids.values())) await self.session.commit() return result + async def _record_projected_hashes( + self, + *, + workflow_ids: list[WorkflowUUID], + ) -> None: + """Re-project imported workflows and record their local-space hashes. + + ``last_synced_spec_hash`` lives in repo-spec space while pending-change + detection compares local projections, so the post-reconcile projection + is the postcondition check (``P(W_next) == S``) and the baseline for + future pending-change comparisons. + """ + if not workflow_ids: + return + projection = await self.project_workspace( + workflow_ids=workflow_ids, + create_missing_mappings=False, + ) + mappings_by_source_id = { + mapping.source_id: mapping for mapping in await self._workflow_mappings() + } + for source_id, spec in projection.spec.workflows.items(): + mapping = mappings_by_source_id.get(source_id) + if mapping is None: + continue + projected_hash = stable_hash(spec) + mapping.last_projected_spec_hash = projected_hash + if mapping.last_synced_spec_hash != projected_hash: + self.logger.warning( + "Workspace sync postcondition mismatch: local projection " + "differs from the pulled spec", + source_id=source_id, + last_synced_spec_hash=mapping.last_synced_spec_hash, + projected_spec_hash=projected_hash, + ) + self.session.add(mapping) + + async def _rebaseline_convergent_mappings( + self, + *, + source_ids: set[str], + local_spec: WorkspaceSpec, + commit_sha: str, + ) -> None: + """Advance sync metadata for resources whose local and remote specs already agree.""" + if not source_ids: + return + mappings_by_source_id = { + mapping.source_id: mapping for mapping in await self._workflow_mappings() + } + for source_id in sorted(source_ids): + mapping = mappings_by_source_id.get(source_id) + workflow_spec = local_spec.workflows.get(source_id) + if mapping is None or workflow_spec is None: + continue + spec_hash = stable_hash(workflow_spec) + mapping.last_synced_commit_sha = commit_sha + mapping.last_synced_spec_hash = spec_hash + mapping.last_projected_spec_hash = spec_hash + mapping.sync_status = ResourceSyncStatus.SYNCED.value + self.session.add(mapping) + await self.session.flush() + async def _resolve_local_workflow_id(self, source_id: str) -> WorkflowUUID: stmt = select(WorkspaceSyncResourceMapping).where( WorkspaceSyncResourceMapping.workspace_id == self.workspace_id, @@ -783,7 +893,7 @@ async def _pending_changes_from_projection( for source_id, spec in sorted(projection.spec.workflows.items()): mapping = mappings_by_source_id.get(source_id) - before_hash = mapping.last_synced_spec_hash if mapping else None + before_hash = mapping.last_projected_spec_hash if mapping else None after_hash = stable_hash(spec) if before_hash == after_hash: continue @@ -841,30 +951,54 @@ async def _remote_changed_source_ids( def _classify_status( self, *, - base_spec_hash: str | None, - local_spec_hash: str, - remote_spec_hash: str | None, + has_base: bool, + has_remote: bool, local_changed_source_ids: set[str], remote_changed_source_ids: set[str], remote_diagnostics: list[PullDiagnostic], ) -> SyncStateStatus: if remote_diagnostics: return SyncStateStatus.ERROR - if base_spec_hash is None or remote_spec_hash is None: + if not has_base or not has_remote: return SyncStateStatus.NEVER_SYNCED - local_matches_base = local_spec_hash == base_spec_hash - remote_matches_base = remote_spec_hash == base_spec_hash - if local_matches_base and remote_matches_base: + if not local_changed_source_ids and not remote_changed_source_ids: return SyncStateStatus.CLEAN - if not local_matches_base and remote_matches_base: + if local_changed_source_ids and not remote_changed_source_ids: return SyncStateStatus.LOCAL_DIRTY - if local_matches_base and not remote_matches_base: + if not local_changed_source_ids and remote_changed_source_ids: return SyncStateStatus.REMOTE_AHEAD if local_changed_source_ids & remote_changed_source_ids: return SyncStateStatus.CONFLICTED return SyncStateStatus.DIVERGED + def _convergent_source_ids( + self, + *, + local_spec: WorkspaceSpec, + remote_spec: WorkspaceSpec | None, + candidates: set[str], + ) -> set[str]: + """Source ids changed on both sides whose local and remote specs are identical. + + These need a rebaseline (advancing sync metadata), not a merge: the + typical case is a local change that was exported and merged back into + the base branch. + """ + if remote_spec is None or not candidates: + return set() + convergent: set[str] = set() + for source_id in candidates: + local = local_spec.workflows.get(source_id) + remote = remote_spec.workflows.get(source_id) + if ( + local is not None + and remote is not None + and stable_hash(local) == stable_hash(remote) + ): + convergent.add(source_id) + return convergent + async def _workflow_mappings(self) -> list[WorkspaceSyncResourceMapping]: stmt = select(WorkspaceSyncResourceMapping).where( WorkspaceSyncResourceMapping.workspace_id == self.workspace_id, @@ -950,7 +1084,7 @@ async def _create_changeset_for_specs( local_id=mapping.local_id if mapping else None, operation=( SyncOperation.CREATE.value - if mapping is None or mapping.last_synced_spec_hash is None + if mapping is None or mapping.last_projected_spec_hash is None else SyncOperation.UPDATE.value ), spec_hash=stable_hash(spec), From 28951a2047bc78c62c52c73909b8bebd418a17dd Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:46:28 +0800 Subject: [PATCH 2/6] fix(sync): harden Git transport and spec path handling - read_files fetches only the manifest and canonical workflow definition paths, via blob SHAs, and skips non-UTF-8 blobs instead of crashing - Strip and require a non-empty commit message in write_files - Fetch commit/branch pages off the event loop (PaginatedList iteration triggers lazy page requests) - Add is_workflow_definition_path predicate; non-canonical paths under the workflow root are ignored rather than parsed - Normalize slug inputs: whitespace-only aliases fall back to the title before the generic 'workflow' fallback --- tracecat/workspace_sync/git.py | 103 ++++++++++++++++++---------- tracecat/workspace_sync/workflow.py | 19 ++++- 2 files changed, 85 insertions(+), 37 deletions(-) diff --git a/tracecat/workspace_sync/git.py b/tracecat/workspace_sync/git.py index 8bc3e6390..16079ad89 100644 --- a/tracecat/workspace_sync/git.py +++ b/tracecat/workspace_sync/git.py @@ -4,6 +4,7 @@ import asyncio import base64 +import itertools from dataclasses import dataclass from typing import Any @@ -17,6 +18,12 @@ from tracecat.service import BaseWorkspaceService from tracecat.sync import CommitInfo, PushStatus from tracecat.vcs.github.app import GitHubAppError, GitHubAppService +from tracecat.workspace_sync.schemas import ( + MANIFEST_FILENAME, + WORKFLOW_ROOT, + WorkspaceManifest, +) +from tracecat.workspace_sync.workflow import is_workflow_definition_path from tracecat.workspaces.service import WorkspaceService @@ -48,20 +55,41 @@ async def read_files( sha=commit.sha, recursive=True, ) + blob_shas = { + item.path: item.sha + for item in tree.tree + if item.type == "blob" and item.path + } + + async def fetch_text(path: str) -> str | None: + blob = await asyncio.to_thread(repo.get_git_blob, blob_shas[path]) + try: + return base64.b64decode(blob.content).decode("utf-8") + except UnicodeDecodeError: + return None + files: dict[str, str] = {} - for item in tree.tree: - if item.type != "blob" or not item.path: - continue - content_file = await asyncio.to_thread( - repo.get_contents, - item.path, - ref=commit.sha, - ) - if isinstance(content_file, list): + workflow_root = WORKFLOW_ROOT + if MANIFEST_FILENAME in blob_shas: + manifest_content = await fetch_text(MANIFEST_FILENAME) + if manifest_content is not None: + files[MANIFEST_FILENAME] = manifest_content + try: + manifest = WorkspaceManifest.model_validate_json( + manifest_content + ) + workflow_root = manifest.resources.workflows.strip("/") + except Exception: + # Invalid manifests surface as parse diagnostics later; + # fall back to the default workflow root here. + pass + + for path in sorted(blob_shas): + if not is_workflow_definition_path(path, workflow_root=workflow_root): continue - files[item.path] = base64.b64decode(content_file.content).decode( - "utf-8" - ) + content = await fetch_text(path) + if content is not None: + files[path] = content return GitTreeSnapshot( commit_sha=commit.sha, tree_sha=getattr(commit.commit.tree, "sha", None), @@ -84,6 +112,9 @@ async def write_files( ) -> CommitInfo: if not files: raise ValueError("At least one file is required for workspace sync export") + message = message.strip() + if not message: + raise ValueError("A non-empty commit message is required") gh_svc = GitHubAppService(session=self.session, role=self.role) gh = await gh_svc.get_github_client_for_repo(url) try: @@ -211,21 +242,22 @@ async def list_commits( try: repo = await asyncio.to_thread(gh.get_repo, f"{url.org}/{url.repo}") commits_paginated = await asyncio.to_thread(repo.get_commits, sha=branch) - commits: list[GitCommitInfo] = [] - for index, commit in enumerate(commits_paginated): - if index >= limit: - break - commits.append( - GitCommitInfo( - sha=commit.sha, - message=commit.commit.message, - author=commit.commit.author.name or "Unknown", - author_email=commit.commit.author.email or "", - date=commit.commit.author.date.isoformat(), - tags=[], - ) + # Page fetches happen lazily during iteration; keep them off the + # event loop. + raw_commits = await asyncio.to_thread( + lambda: list(itertools.islice(commits_paginated, limit)) + ) + return [ + GitCommitInfo( + sha=commit.sha, + message=commit.commit.message, + author=commit.commit.author.name or "Unknown", + author_email=commit.commit.author.email or "", + date=commit.commit.author.date.isoformat(), + tags=[], ) - return commits + for commit in raw_commits + ] except GithubException as e: raise GitHubAppError(f"GitHub API error: {e.status} - {e.data}") from e finally: @@ -242,17 +274,16 @@ async def list_branches( try: repo = await asyncio.to_thread(gh.get_repo, f"{url.org}/{url.repo}") branches_paginated = await asyncio.to_thread(repo.get_branches) - branches: list[GitBranchInfo] = [] - for index, branch_obj in enumerate(branches_paginated): - if index >= limit: - break - branches.append( - GitBranchInfo( - name=branch_obj.name, - is_default=branch_obj.name == repo.default_branch, - ) + raw_branches = await asyncio.to_thread( + lambda: list(itertools.islice(branches_paginated, limit)) + ) + return [ + GitBranchInfo( + name=branch_obj.name, + is_default=branch_obj.name == repo.default_branch, ) - return branches + for branch_obj in raw_branches + ] except GithubException as e: raise GitHubAppError(f"GitHub API error: {e.status} - {e.data}") from e finally: diff --git a/tracecat/workspace_sync/workflow.py b/tracecat/workspace_sync/workflow.py index 556033760..99ec85947 100644 --- a/tracecat/workspace_sync/workflow.py +++ b/tracecat/workspace_sync/workflow.py @@ -43,8 +43,25 @@ def workflow_source_id_from_path(path: str) -> str | None: return source_id or None +def is_workflow_definition_path( + path: str, *, workflow_root: str = WORKFLOW_ROOT +) -> bool: + """Return True for canonical `//definition.yml` paths.""" + parts = path.strip("/").split("/") + return ( + len(parts) == 3 + and parts[0] == workflow_root + and bool(parts[1]) + and parts[2] == WORKFLOW_DEFINITION_FILENAME + ) + + def default_workflow_source_id(*, alias: str | None, title: str) -> str: - base = slugify(alias or title, separator="-") or "workflow" + base = ( + slugify((alias or "").strip(), separator="-") + or slugify(title.strip(), separator="-") + or "workflow" + ) return base[:96].strip("-") or "workflow" From d3cec9a5e3cfe48a3169505dc2e63b6376d8c2bb Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:46:28 +0800 Subject: [PATCH 3/6] feat(sync): surface local deletes and reconcile remote deletions on pull - Pending changes now include non-exportable DELETE entries for synced resources missing from the local projection, so status and counts stay honest - Introduce PullReconciliationPlan shared by pull and status: convergent resources (equal specs or deleted on both sides) rebaseline; resources deleted remotely but unchanged locally are untracked back into pending creates; resources deleted locally but changed remotely are re-imported (Git owns desired state); only divergent dual edits conflict - parse_files only parses canonical workflow definition paths --- tests/unit/test_workspace_sync.py | 261 ++++++++++++++++++++++++++++- tracecat/workspace_sync/service.py | 167 +++++++++++++++--- 2 files changed, 403 insertions(+), 25 deletions(-) diff --git a/tests/unit/test_workspace_sync.py b/tests/unit/test_workspace_sync.py index 8945d341b..e74d582d9 100644 --- a/tests/unit/test_workspace_sync.py +++ b/tests/unit/test_workspace_sync.py @@ -728,7 +728,10 @@ async def test_pull_handwritten_spec_without_webhook_stays_clean( "definition": sample_dsl.model_dump(mode="json", exclude_none=True), }, sort_keys=False, - ) + ), + # Non-spec files must be ignored, not parsed as workflows. + "workflows/README.md": "# Workflow docs\n", + "workflows/examples/nested/notes.yml": "not: a workflow\n", } ) url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) @@ -776,3 +779,259 @@ async def test_readonly_projection_dedupes_duplicate_slugs( creating = await service.project_workspace(create_missing_mappings=True) assert set(creating.spec.workflows) == set(readonly.spec.workflows) assert creating.spec_hash == readonly.spec_hash + + +async def _establish_synced_baseline( + *, + session: AsyncSession, + role: Role, + service: WorkspaceGitSyncService, + url, + source_id: str, +) -> None: + """Create a changeset, export it, simulate a merge, and pull to CLEAN.""" + changeset = await service.create_changeset( + params=ChangeSetCreate( + title="Export workflow", + resources=[ResourceRef(resource_type="workflow", source_id=source_id)], + ) + ) + await service.export_changeset( + changeset_id=changeset.id, + params=ChangeSetExport( + message="Export workflow", + branch=f"sync/{source_id}", + create_pr=False, + ), + ) + assert FakeGitHubSyncTransport.written_files is not None + FakeGitHubSyncTransport.files = dict(FakeGitHubSyncTransport.written_files) + result = await service.pull(url=url, options=PullOptions(commit_sha="a" * 40)) + assert result.success, result.message + status = await service.get_status() + assert status.status == "clean" + + +async def _delete_local_workflow(session: AsyncSession, *, alias: str) -> None: + workflow = await session.scalar(select(Workflow).where(Workflow.alias == alias)) + assert workflow is not None + await session.delete(workflow) + await session.commit() + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_local_delete_surfaces_as_non_exportable_pending_change( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias="detect-okta-risk" + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + await _establish_synced_baseline( + session=session, + role=svc_role, + service=service, + url=url, + source_id="detect-okta-risk", + ) + + await _delete_local_workflow(session, alias="detect-okta-risk") + + pending = await service.list_pending_changes() + assert len(pending.changes) == 1 + assert pending.changes[0].operation == "delete" + assert pending.changes[0].exportable is False + status = await service.get_status() + assert status.status == "local_dirty" + assert status.pending_change_count == 1 + + # Remote unchanged: pull succeeds and the delete stays pending. + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + pending = await service.list_pending_changes() + assert len(pending.changes) == 1 + assert pending.changes[0].operation == "delete" + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_remote_delete_untracks_mapping_into_pending_create( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias="detect-okta-risk" + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + await _establish_synced_baseline( + session=session, + role=svc_role, + service=service, + url=url, + source_id="detect-okta-risk", + ) + + # Someone deletes the workflow file in the repository. + FakeGitHubSyncTransport.files = { + path: content + for path, content in FakeGitHubSyncTransport.files.items() + if path == "tracecat.json" + } + status = await service.get_status() + assert status.status == "remote_ahead" + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + + # The local workflow survives but is untracked: a pending create. + mapping = await session.scalar( + select(WorkspaceSyncResourceMapping).where( + WorkspaceSyncResourceMapping.workspace_id == svc_role.workspace_id + ) + ) + assert mapping is not None + assert mapping.last_synced_spec_hash is None + assert mapping.sync_status == "untracked" + pending = await service.list_pending_changes() + assert len(pending.changes) == 1 + assert pending.changes[0].operation == "create" + status = await service.get_status() + assert status.status == "local_dirty" + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_delete_on_both_sides_rebaselines_clean( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias="detect-okta-risk" + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + await _establish_synced_baseline( + session=session, + role=svc_role, + service=service, + url=url, + source_id="detect-okta-risk", + ) + + await _delete_local_workflow(session, alias="detect-okta-risk") + FakeGitHubSyncTransport.files = { + path: content + for path, content in FakeGitHubSyncTransport.files.items() + if path == "tracecat.json" + } + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + + mapping = await session.scalar( + select(WorkspaceSyncResourceMapping).where( + WorkspaceSyncResourceMapping.workspace_id == svc_role.workspace_id + ) + ) + assert mapping is None + status = await service.get_status() + assert status.status == "clean" + assert (await service.list_pending_changes()).changes == [] + + +@pytest.mark.anyio +@pytest.mark.usefixtures("db") +async def test_remote_change_resurrects_locally_deleted_workflow( + session: AsyncSession, + svc_role: Role, + svc_workspace, + sample_dsl: DSLInput, +) -> None: + """Git owns desired state: a remote edit re-imports a locally deleted workflow.""" + svc_workspace.settings = {"git_repo_url": TEST_REPO_URL} + session.add(svc_workspace) + await _create_local_workflow( + session=session, role=svc_role, dsl=sample_dsl, alias="detect-okta-risk" + ) + _reset_fake_transport() + url = parse_git_url(TEST_REPO_URL, allowed_domains={"github.com"}) + + with ( + patch( + "tracecat.workspace_sync.service.WorkspaceGitHubSyncService", + FakeGitHubSyncTransport, + ), + _registry_lock_patch(), + ): + service = WorkspaceGitSyncService(session=session, role=svc_role) + await _establish_synced_baseline( + session=session, + role=svc_role, + service=service, + url=url, + source_id="detect-okta-risk", + ) + + await _delete_local_workflow(session, alias="detect-okta-risk") + path = "workflows/detect-okta-risk/definition.yml" + remote = yaml.safe_load(FakeGitHubSyncTransport.files[path]) + remote["definition"]["description"] = "Edited in GitHub" + FakeGitHubSyncTransport.files = { + **FakeGitHubSyncTransport.files, + path: yaml.safe_dump(remote, sort_keys=False), + } + + result = await service.pull(url=url, options=PullOptions(commit_sha="b" * 40)) + assert result.success, result.message + assert result.workflows_imported == 1 + + workflow = await session.scalar( + select(Workflow).where(Workflow.alias == "detect-okta-risk") + ) + assert workflow is not None + status = await service.get_status() + assert status.status == "clean" + assert (await service.list_pending_changes()).changes == [] diff --git a/tracecat/workspace_sync/service.py b/tracecat/workspace_sync/service.py index 1efc24831..4afc3c2d2 100644 --- a/tracecat/workspace_sync/service.py +++ b/tracecat/workspace_sync/service.py @@ -4,6 +4,7 @@ import uuid from collections.abc import Sequence +from dataclasses import dataclass from datetime import UTC, datetime from typing import Any @@ -68,6 +69,7 @@ ) from tracecat.workspace_sync.workflow import ( default_workflow_source_id, + is_workflow_definition_path, parse_workflow_spec, serialize_workflow_spec, workflow_source_path, @@ -77,6 +79,21 @@ from tracecat.workspaces.service import WorkspaceService +@dataclass(frozen=True) +class PullReconciliationPlan: + """Per-resource change sets driving pull reconciliation and status.""" + + local_changed: set[str] + local_deleted: set[str] + remote_changed: set[str] + remote_deleted: set[str] + convergent: set[str] + resurrect: set[str] + conflicts: list[str] + to_import: set[str] + untrack: set[str] + + class WorkspaceGitSyncService(BaseWorkspaceService): """Workspace-level Git sync service.""" @@ -174,7 +191,7 @@ async def parse_files( workflow_root = manifest.resources.workflows.strip("/") workflows: dict[str, WorkflowResourceSpec] = {} for path, content in sorted(files.items()): - if not path.startswith(f"{workflow_root}/"): + if not is_workflow_definition_path(path, workflow_root=workflow_root): continue spec, diagnostic = parse_workflow_spec(path, content) if diagnostic is not None: @@ -252,15 +269,13 @@ async def pull( projection=local_projection, state=state, ) - local_changed = {change.source_id for change in pending.changes} - remote_changed = await self._remote_changed_source_ids(snapshot) - convergent = self._convergent_source_ids( + plan = self._plan_pull( + pending=pending, local_spec=local_projection.spec, remote_spec=snapshot.spec, - candidates=local_changed & remote_changed, + remote_changed=await self._remote_changed_source_ids(snapshot), ) - conflicts = sorted((local_changed & remote_changed) - convergent) - if conflicts: + if plan.conflicts: return PullResult( success=False, commit_sha=snapshot.commit_sha, @@ -278,16 +293,16 @@ async def pull( ), details={"source_id": source_id}, ) - for source_id in conflicts + for source_id in plan.conflicts ], - message=f"Pull blocked by {len(conflicts)} conflicting resource(s)", + message=f"Pull blocked by {len(plan.conflicts)} conflicting resource(s)", ) spec_to_apply = WorkspaceSpec( workflows={ source_id: workflow_spec for source_id, workflow_spec in snapshot.spec.workflows.items() - if source_id in remote_changed and source_id not in convergent + if source_id in plan.to_import } ) result = await self._reconcile_workflow_specs( @@ -298,10 +313,11 @@ async def pull( return result await self._rebaseline_convergent_mappings( - source_ids=convergent, + source_ids=plan.convergent, local_spec=local_projection.spec, commit_sha=snapshot.commit_sha, ) + await self._untrack_remote_deleted_mappings(source_ids=plan.untrack) await self._record_successful_pull( state=state, snapshot=snapshot, @@ -348,19 +364,22 @@ async def get_status(self) -> WorkspaceSyncStatus: projection=local_projection, state=state, ) - local_changed = {change.source_id for change in pending.changes} - remote_changed = await self._remote_changed_source_ids(remote_snapshot) - convergent = self._convergent_source_ids( + plan = self._plan_pull( + pending=pending, local_spec=local_projection.spec, - remote_spec=remote_snapshot.spec if remote_snapshot else None, - candidates=local_changed & remote_changed, + remote_spec=remote_snapshot.spec if remote_snapshot else WorkspaceSpec(), + remote_changed=await self._remote_changed_source_ids(remote_snapshot), ) remote_spec_hash = remote_snapshot.spec_hash if remote_snapshot else None status = self._classify_status( has_base=state.base_commit_sha is not None, has_remote=remote_snapshot is not None, - local_changed_source_ids=local_changed - convergent, - remote_changed_source_ids=remote_changed, + local_changed_source_ids=( + (plan.local_changed | plan.local_deleted) + - plan.convergent + - plan.resurrect + ), + remote_changed_source_ids=plan.remote_changed, remote_diagnostics=diagnostics, ) @@ -779,8 +798,12 @@ async def _rebaseline_convergent_mappings( } for source_id in sorted(source_ids): mapping = mappings_by_source_id.get(source_id) + if mapping is None: + continue workflow_spec = local_spec.workflows.get(source_id) - if mapping is None or workflow_spec is None: + if workflow_spec is None: + # Deleted on both sides: the mapping no longer identifies anything. + await self.session.delete(mapping) continue spec_hash = stable_hash(workflow_spec) mapping.last_synced_commit_sha = commit_sha @@ -790,6 +813,29 @@ async def _rebaseline_convergent_mappings( self.session.add(mapping) await self.session.flush() + async def _untrack_remote_deleted_mappings(self, *, source_ids: set[str]) -> None: + """Detach mappings for resources deleted remotely but kept locally. + + With the ignore-missing delete policy the local resource survives the + pull; clearing sync metadata turns it back into a pending create so the + admin can re-export it or delete it explicitly. + """ + if not source_ids: + return + mappings_by_source_id = { + mapping.source_id: mapping for mapping in await self._workflow_mappings() + } + for source_id in sorted(source_ids): + mapping = mappings_by_source_id.get(source_id) + if mapping is None: + continue + mapping.last_synced_commit_sha = None + mapping.last_synced_spec_hash = None + mapping.last_projected_spec_hash = None + mapping.sync_status = ResourceSyncStatus.UNTRACKED.value + self.session.add(mapping) + await self.session.flush() + async def _resolve_local_workflow_id(self, source_id: str) -> WorkflowUUID: stmt = select(WorkspaceSyncResourceMapping).where( WorkspaceSyncResourceMapping.workspace_id == self.workspace_id, @@ -915,6 +961,32 @@ async def _pending_changes_from_projection( ) ) + # Previously synced resources missing from the projection were deleted + # locally. Deletes are not exportable in v1 but must surface so status + # and pending counts stay honest. + projected_source_ids = set(projection.spec.workflows) + for mapping in mappings: + if mapping.source_id in projected_source_ids: + continue + if mapping.last_projected_spec_hash is None: + continue + changes.append( + WorkspaceSyncPendingChange( + resource_type=SyncResourceType.WORKFLOW.value, + source_id=mapping.source_id, + source_path=mapping.source_path + or workflow_source_path(mapping.source_id), + local_id=mapping.local_id, + operation=SyncOperation.DELETE, + title=None, + alias=None, + before_spec_hash=mapping.last_projected_spec_hash, + after_spec_hash=None, + exportable=False, + ) + ) + changes.sort(key=lambda change: change.source_id) + return WorkspaceSyncPendingChanges( base_spec_hash=state.base_spec_hash, local_spec_hash=projection.spec_hash, @@ -972,6 +1044,51 @@ def _classify_status( return SyncStateStatus.CONFLICTED return SyncStateStatus.DIVERGED + def _plan_pull( + self, + *, + pending: WorkspaceSyncPendingChanges, + local_spec: WorkspaceSpec, + remote_spec: WorkspaceSpec, + remote_changed: set[str], + ) -> PullReconciliationPlan: + """Build the per-resource reconciliation plan shared by pull and status. + + Policy (v1): convergent resources rebaseline; resources deleted locally + but changed remotely are re-imported (Git owns desired state); resources + deleted remotely but unchanged locally are untracked; everything changed + on both sides with differing specs is a conflict. + """ + local_deleted = { + change.source_id + for change in pending.changes + if change.operation == SyncOperation.DELETE + } + local_changed = {change.source_id for change in pending.changes} - local_deleted + remote_present = set(remote_spec.workflows) + remote_deleted = remote_changed - remote_present + overlap = (local_changed | local_deleted) & remote_changed + convergent = self._convergent_source_ids( + local_spec=local_spec, + remote_spec=remote_spec, + candidates=overlap, + ) + resurrect = (local_deleted & remote_changed & remote_present) - convergent + conflicts = sorted(overlap - convergent - resurrect) + to_import = (remote_changed & remote_present) - convergent + untrack = remote_deleted - local_deleted + return PullReconciliationPlan( + local_changed=local_changed, + local_deleted=local_deleted, + remote_changed=remote_changed, + remote_deleted=remote_deleted, + convergent=convergent, + resurrect=resurrect, + conflicts=conflicts, + to_import=to_import, + untrack=untrack, + ) + def _convergent_source_ids( self, *, @@ -979,11 +1096,11 @@ def _convergent_source_ids( remote_spec: WorkspaceSpec | None, candidates: set[str], ) -> set[str]: - """Source ids changed on both sides whose local and remote specs are identical. + """Source ids changed on both sides where no merge is required. - These need a rebaseline (advancing sync metadata), not a merge: the - typical case is a local change that was exported and merged back into - the base branch. + Covers identical local/remote specs (the typical exported-then-merged + change) and resources deleted on both sides. These rebaseline instead + of conflicting. """ if remote_spec is None or not candidates: return set() @@ -991,7 +1108,9 @@ def _convergent_source_ids( for source_id in candidates: local = local_spec.workflows.get(source_id) remote = remote_spec.workflows.get(source_id) - if ( + if local is None and remote is None: + convergent.add(source_id) + elif ( local is not None and remote is not None and stable_hash(local) == stable_hash(remote) From 16dd01c6b3ea84b13cad2922ecad659a9206ee1b Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:46:28 +0800 Subject: [PATCH 4/6] fix(sync): validate changeset title, export message, and branch params - Strip and reject whitespace-only ChangeSetCreate.title and ChangeSetExport.message - Validate pr_base_branch when provided (empty string no longer silently falls back to the default base branch) - Add a random suffix to legacy auto-generated publish branch names to avoid same-second collisions --- tracecat/workflow/store/router.py | 2 +- tracecat/workflow/store/service.py | 3 ++- tracecat/workspace_sync/schemas.py | 18 +++++++++++++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/tracecat/workflow/store/router.py b/tracecat/workflow/store/router.py index 1dfe3e418..eb9e0fd8a 100644 --- a/tracecat/workflow/store/router.py +++ b/tracecat/workflow/store/router.py @@ -388,7 +388,7 @@ async def export_workspace_sync_changeset( ) try: validate_short_branch_name(params.branch, field_name="branch") - if params.pr_base_branch: + if params.pr_base_branch is not None: validate_short_branch_name( params.pr_base_branch, field_name="pr_base_branch", diff --git a/tracecat/workflow/store/service.py b/tracecat/workflow/store/service.py index af72c6f70..a312309f6 100644 --- a/tracecat/workflow/store/service.py +++ b/tracecat/workflow/store/service.py @@ -1,3 +1,4 @@ +import uuid from datetime import UTC, datetime from pathlib import Path @@ -68,7 +69,7 @@ async def publish_workflow_dsl( workspace_id=str(self.workspace_id), ) validated_branch = validate_short_branch_name( - f"tracecat-sync-{datetime.now(UTC).strftime('%Y%m%d-%H%M%S')}", + f"tracecat-sync-{datetime.now(UTC).strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:6]}", field_name="branch", ) create_pr = True diff --git a/tracecat/workspace_sync/schemas.py b/tracecat/workspace_sync/schemas.py index aee7c0176..11ae9bef9 100644 --- a/tracecat/workspace_sync/schemas.py +++ b/tracecat/workspace_sync/schemas.py @@ -132,13 +132,29 @@ class ChangeSetCreate(BaseModel): description: str | None = None resources: list[ResourceRef] + @field_validator("title") + @classmethod + def validate_title(cls, value: str) -> str: + cleaned = value.strip() + if not cleaned: + raise ValueError("title cannot be empty or whitespace") + return cleaned + class ChangeSetExport(BaseModel): - message: str + message: str = Field(min_length=1) branch: str create_pr: bool = False pr_base_branch: str | None = None + @field_validator("message") + @classmethod + def validate_message(cls, value: str) -> str: + cleaned = value.strip() + if not cleaned: + raise ValueError("message cannot be empty or whitespace") + return cleaned + class WorkspaceSyncPendingChange(BaseModel): resource_type: str From e2c0ee4cd712530559007257797fdc7abfe5e8f1 Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:46:28 +0800 Subject: [PATCH 5/6] chore(db): drop redundant unique constraints on sync table id columns The unique index on id already enforces uniqueness; the extra UNIQUE constraint created a second identical index on every sync table. --- .../versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py b/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py index 1e7ef46aa..e9bbebd25 100644 --- a/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py +++ b/alembic/versions/25f4e2a1c9d8_add_workspace_git_sync_tables.py @@ -106,7 +106,6 @@ def upgrade() -> None: *_timestamps(), *_tenant_fks("workspace_sync_state"), sa.PrimaryKeyConstraint("surrogate_id", name=op.f("pk_workspace_sync_state")), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_state_id")), sa.UniqueConstraint( "workspace_id", "provider", @@ -153,7 +152,6 @@ def upgrade() -> None: sa.PrimaryKeyConstraint( "surrogate_id", name=op.f("pk_workspace_sync_resource_mapping") ), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_resource_mapping_id")), sa.UniqueConstraint( "workspace_id", "provider", @@ -232,7 +230,6 @@ def upgrade() -> None: sa.PrimaryKeyConstraint( "surrogate_id", name=op.f("pk_workspace_sync_changeset") ), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_changeset_id")), ) op.create_index( op.f("ix_workspace_sync_changeset_id"), @@ -277,7 +274,6 @@ def upgrade() -> None: sa.PrimaryKeyConstraint( "surrogate_id", name=op.f("pk_workspace_sync_changeset_item") ), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_changeset_item_id")), sa.UniqueConstraint( "changeset_id", "resource_type", @@ -342,7 +338,6 @@ def upgrade() -> None: sa.PrimaryKeyConstraint( "surrogate_id", name=op.f("pk_workspace_sync_materialization") ), - sa.UniqueConstraint("id", name=op.f("uq_workspace_sync_materialization_id")), ) op.create_index( op.f("ix_workspace_sync_materialization_id"), From ee74c72335694ee53197f3a8d3e4d14be109bd49 Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:46:28 +0800 Subject: [PATCH 6/6] fix(ui): limit staging selection to exportable changes --- .../organization/workspace-sync-staging.tsx | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/frontend/src/components/organization/workspace-sync-staging.tsx b/frontend/src/components/organization/workspace-sync-staging.tsx index 844285b2b..db3a85ce2 100644 --- a/frontend/src/components/organization/workspace-sync-staging.tsx +++ b/frontend/src/components/organization/workspace-sync-staging.tsx @@ -120,16 +120,22 @@ export function WorkspaceSyncStaging({ if (!pendingQuery.data) { return } - const sourceIds = new Set(changes.map((change) => change.source_id)) + const exportableSourceIds = new Set( + changes + .filter((change) => change.exportable) + .map((change) => change.source_id) + ) setSelectedSourceIds((current) => { - if (sourceIds.size === 0) { - return sourceIds + if (exportableSourceIds.size === 0) { + return exportableSourceIds } if (!selectionInitializedRef.current) { selectionInitializedRef.current = true - return sourceIds + return exportableSourceIds } - return new Set([...current].filter((sourceId) => sourceIds.has(sourceId))) + return new Set( + [...current].filter((sourceId) => exportableSourceIds.has(sourceId)) + ) }) }, [changes, pendingQuery.data]) @@ -148,8 +154,8 @@ export function WorkspaceSyncStaging({ ) } - const selectedChanges = changes.filter((change) => - selectedSourceIds.has(change.source_id) + const selectedChanges = changes.filter( + (change) => change.exportable && selectedSourceIds.has(change.source_id) ) const isBusy = createChangeset.isPending || exportChangeset.isPending const canExport =