diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 6cfc666b352..bf5b45fbf4e 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -43,6 +43,7 @@ type ( GetNextPageTokenParams struct { Namespace namespace.Name NamespaceID namespace.ID + Query string PageSize int NextPageToken []byte } @@ -50,6 +51,7 @@ type ( DeleteExecutionsActivityParams struct { Namespace namespace.Name NamespaceID namespace.ID + Query string // Deprecated. // TODO: remove after 1.27 release. RPS int @@ -99,7 +101,7 @@ func (a *LocalActivities) GetNextPageTokenActivity(ctx context.Context, params G Namespace: params.Namespace, PageSize: params.PageSize, NextPageToken: params.NextPageToken, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(params.Query), } resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req) @@ -160,7 +162,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete Namespace: params.Namespace, PageSize: params.ListPageSize, NextPageToken: params.NextPageToken, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(params.Query), } resp, err := a.visibilityManager.ListWorkflowExecutions(ctx, req) if err != nil { diff --git a/service/worker/deletenamespace/deleteexecutions/workflow.go b/service/worker/deletenamespace/deleteexecutions/workflow.go index 39696d5f4b2..536de21eef5 100644 --- a/service/worker/deletenamespace/deleteexecutions/workflow.go +++ b/service/worker/deletenamespace/deleteexecutions/workflow.go @@ -21,6 +21,7 @@ type ( DeleteExecutionsParams struct { Namespace namespace.Name NamespaceID namespace.ID + Query string Config DeleteExecutionsConfig // Number of Workflow Executions to delete. Used in statistics computation. @@ -149,6 +150,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam deleteExecutionsFuture := workflow.ExecuteActivity(ctx1, a.DeleteExecutionsActivity, &DeleteExecutionsActivityParams{ Namespace: params.Namespace, NamespaceID: params.NamespaceID, + Query: params.Query, RPS: params.Config.DeleteActivityRPS, ListPageSize: params.Config.PageSize, NextPageToken: nextPageToken, @@ -158,6 +160,7 @@ func DeleteExecutionsWorkflow(ctx workflow.Context, params DeleteExecutionsParam err := workflow.ExecuteLocalActivity(ctx2, la.GetNextPageTokenActivity, GetNextPageTokenParams{ NamespaceID: params.NamespaceID, Namespace: params.Namespace, + Query: params.Query, PageSize: params.Config.PageSize, NextPageToken: nextPageToken, }).Get(ctx, &nextPageToken) diff --git a/service/worker/deletenamespace/deleteexecutions/workflow_test.go b/service/worker/deletenamespace/deleteexecutions/workflow_test.go index 9c72e3e2826..78ae2cde657 100644 --- a/service/worker/deletenamespace/deleteexecutions/workflow_test.go +++ b/service/worker/deletenamespace/deleteexecutions/workflow_test.go @@ -31,6 +31,8 @@ import ( "go.uber.org/mock/gomock" ) +const testDeleteExecutionsQuery = "WorkflowId = 'abc'" + func Test_DeleteExecutionsWorkflow_Success(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} testSuite.SetLogger(log.NewSdkLogger(log.NewTestLogger())) @@ -42,6 +44,7 @@ func Test_DeleteExecutionsWorkflow_Success(t *testing.T) { env.OnActivity(la.GetNextPageTokenActivity, mock.Anything, GetNextPageTokenParams{ Namespace: "namespace", NamespaceID: "namespace-id", + Query: testDeleteExecutionsQuery, PageSize: 1000, NextPageToken: nil, }).Return(nil, nil).Once() @@ -49,6 +52,7 @@ func Test_DeleteExecutionsWorkflow_Success(t *testing.T) { env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, DeleteExecutionsActivityParams{ Namespace: "namespace", NamespaceID: "namespace-id", + Query: testDeleteExecutionsQuery, RPS: 100, ListPageSize: 1000, NextPageToken: nil, @@ -60,6 +64,7 @@ func Test_DeleteExecutionsWorkflow_Success(t *testing.T) { env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, }) require.True(t, env.IsWorkflowCompleted()) @@ -82,7 +87,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_NoExecutions(t *testing.T) { Namespace: "namespace", PageSize: 1000, NextPageToken: nil, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(testDeleteExecutionsQuery), }).Return(&manager.ListWorkflowExecutionsResponse{ Executions: nil, NextPageToken: nil, @@ -107,6 +112,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_NoExecutions(t *testing.T) { env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, }) require.True(t, env.IsWorkflowCompleted()) @@ -130,6 +136,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_NoContinueAsNew(t *testing.T) env.OnActivity(la.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return(func(_ context.Context, params GetNextPageTokenParams) ([]byte, error) { require.Equal(t, namespace.Name("namespace"), params.Namespace) require.Equal(t, namespace.ID("namespace-id"), params.NamespaceID) + require.Equal(t, testDeleteExecutionsQuery, params.Query) require.Equal(t, 3, params.PageSize) if pageNumber.Load() == 0 { require.Nil(t, params.NextPageToken) @@ -147,6 +154,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_NoContinueAsNew(t *testing.T) env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(func(_ context.Context, params DeleteExecutionsActivityParams) (DeleteExecutionsActivityResult, error) { require.Equal(t, namespace.Name("namespace"), params.Namespace) require.Equal(t, namespace.ID("namespace-id"), params.NamespaceID) + require.Equal(t, testDeleteExecutionsQuery, params.Query) require.Equal(t, 3, params.ListPageSize) if params.NextPageToken == nil { nilTokenOnce.Store(true) @@ -163,6 +171,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_NoContinueAsNew(t *testing.T) env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, Config: DeleteExecutionsConfig{ PageSize: 3, }, @@ -190,6 +199,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) { env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: "", Config: DeleteExecutionsConfig{ PageSize: 3, PagesPerExecution: 78, @@ -209,6 +219,7 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ContinueAsNew(t *testing.T) { require.Equal(t, 78, newWfParams.PreviousSuccessCount) require.Equal(t, 0, newWfParams.PreviousErrorCount) require.Equal(t, []byte{3, 22, 83}, newWfParams.NextPageToken) + require.Empty(t, newWfParams.Query) } func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) { @@ -220,15 +231,22 @@ func Test_DeleteExecutionsWorkflow_ManyExecutions_ActivityError(t *testing.T) { var la *LocalActivities env.OnActivity(la.GetNextPageTokenActivity, mock.Anything, mock.Anything). - Return([]byte{3, 22, 83}, nil). + Return(func(_ context.Context, params GetNextPageTokenParams) ([]byte, error) { + require.Equal(t, testDeleteExecutionsQuery, params.Query) + return []byte{3, 22, 83}, nil + }). Times(40) // GoSDK defaultMaximumAttemptsForUnitTest value * defaultConcurrentDeleteExecutionsActivities. env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything). - Return(DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("specific_error_from_activity")). + Return(func(_ context.Context, params DeleteExecutionsActivityParams) (DeleteExecutionsActivityResult, error) { + require.Equal(t, testDeleteExecutionsQuery, params.Query) + return DeleteExecutionsActivityResult{}, serviceerror.NewUnavailable("specific_error_from_activity") + }). Times(40) // GoSDK defaultMaximumAttemptsForUnitTest value * defaultConcurrentDeleteExecutionsActivities. env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, Config: DeleteExecutionsConfig{ PageSize: 3, }, @@ -255,7 +273,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) Namespace: "namespace", PageSize: 2, NextPageToken: nil, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(testDeleteExecutionsQuery), }).Return(&manager.ListWorkflowExecutionsResponse{ Executions: []*workflowpb.WorkflowExecutionInfo{ { @@ -282,7 +300,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) Namespace: "namespace", PageSize: 2, NextPageToken: []byte{22, 8, 78}, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(testDeleteExecutionsQuery), }).Return(&manager.ListWorkflowExecutionsResponse{ Executions: []*workflowpb.WorkflowExecutionInfo{ { @@ -333,6 +351,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ManyExecutions(t *testing.T) env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, Config: DeleteExecutionsConfig{ PageSize: 2, }, @@ -431,6 +450,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_ChasmExecutions(t *testing.T) env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: "", Config: DeleteExecutionsConfig{ PageSize: 2, }, @@ -458,7 +478,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing Namespace: "namespace", PageSize: 2, NextPageToken: nil, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(testDeleteExecutionsQuery), }).Return(&manager.ListWorkflowExecutionsResponse{ Executions: []*workflowpb.WorkflowExecutionInfo{ { @@ -485,7 +505,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing Namespace: "namespace", PageSize: 2, NextPageToken: []byte{22, 8, 78}, - Query: sadefs.QueryWithAnyNamespaceDivision(""), + Query: sadefs.QueryWithAnyNamespaceDivision(testDeleteExecutionsQuery), }).Return(&manager.ListWorkflowExecutionsResponse{ Executions: []*workflowpb.WorkflowExecutionInfo{ { @@ -530,6 +550,7 @@ func Test_DeleteExecutionsWorkflow_NoActivityMocks_HistoryClientError(t *testing env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, Config: DeleteExecutionsConfig{ PageSize: 2, }, @@ -555,6 +576,7 @@ func Test_DeleteExecutionsWorkflow_QueryStats(t *testing.T) { pageNumber := 0 env.OnActivity(la.GetNextPageTokenActivity, mock.Anything, mock.Anything).Return(func(_ context.Context, params GetNextPageTokenParams) ([]byte, error) { + require.Equal(t, testDeleteExecutionsQuery, params.Query) pageNumber++ if pageNumber == 4 { // Emulate 100 pages of executions. @@ -564,6 +586,7 @@ func Test_DeleteExecutionsWorkflow_QueryStats(t *testing.T) { }).Times(4) env.OnActivity(a.DeleteExecutionsActivity, mock.Anything, mock.Anything).Return(func(_ context.Context, params DeleteExecutionsActivityParams) (DeleteExecutionsActivityResult, error) { + require.Equal(t, testDeleteExecutionsQuery, params.Query) return DeleteExecutionsActivityResult{ ErrorCount: 10, SuccessCount: 220, @@ -598,6 +621,7 @@ func Test_DeleteExecutionsWorkflow_QueryStats(t *testing.T) { env.ExecuteWorkflow(DeleteExecutionsWorkflow, DeleteExecutionsParams{ NamespaceID: "namespace-id", Namespace: "namespace", + Query: testDeleteExecutionsQuery, Config: DeleteExecutionsConfig{ ConcurrentDeleteExecutionsActivities: 1, // To linearize the execution of activities. },