diff --git a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java index bff1809f92..86308d149d 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/RequireOperatorFactory.java @@ -23,12 +23,15 @@ import java.time.Instant; import java.util.UUID; +import static java.lang.Math.abs; import static java.util.Locale.ENGLISH; public class RequireOperatorFactory implements OperatorFactory { private static final int MAX_TASK_RETRY_INTERVAL = 10; + // ToDo configurable in server config to run test solidly + private static final int DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT = 60 * 10; // 10 min. private static Logger logger = LoggerFactory.getLogger(RequireOperatorFactory.class); @@ -142,7 +145,8 @@ public TaskResult runTask() projectIdentifier.transform(ProjectIdentifier::toString).or(""), workflowName)); } catch (ResourceLimitExceededException ex) { - throw new TaskExecutionException(ex); + logger.warn("Number of attempts or tasks exceed limit. Retry {} seconds later", DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT); + throw TaskExecutionException.ofNextPolling(DELAY_SECONDS_KICK_IN_RESOURCE_LIMIT, ConfigElement.copyOf(lastStateParams)); } } diff --git a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java index 35c8f8309d..08639f8b68 100644 --- a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java +++ b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java @@ -1562,12 +1562,23 @@ public StoredSessionAttempt insertAttempt(long sessionId, int projId, SessionAtt throws ResourceConflictException, ResourceNotFoundException { long attemptId = catchForeignKeyNotFound(() -> - catchConflict(() -> - dao.insertAttempt(siteId, projId, sessionId, - attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(), - AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams()), - "session attempt name=%s in session id=%d", attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId), - "workflow definition id=%d", attempt.getWorkflowDefinitionId().orNull()); + catchConflict( + () -> { + // select id from sessions where id = for update + return dao.insertAttempt( + siteId, + projId, + sessionId, + attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), attempt.getWorkflowDefinitionId().orNull(), + AttemptStateFlags.empty().get(), attempt.getTimeZone().getId(), attempt.getParams() + ); + }, + "session attempt name=%s in session id=%d", + attempt.getRetryAttemptName().or(DEFAULT_ATTEMPT_NAME), sessionId + ), + "workflow definition id=%d", + attempt.getWorkflowDefinitionId().orNull() + ); dao.updateLastAttemptId(sessionId, attemptId); try { return requiredResource( @@ -1660,7 +1671,8 @@ public T putAndLockSession(Session session, SessionLockAction func) // select first so that conflicting insert (postgresql) or foreign key constraint violation (h2) // doesn't increment sequence of primary key unnecessarily - storedSession = dao.getSessionByConflictedNamesInternal( + // the session must be locked same as `dao.upsertAndLockSession()` + storedSession = dao.getAndLockSessionByConflictedNamesInternal( session.getProjectId(), session.getWorkflowName(), session.getSessionTime().getEpochSecond()); @@ -1676,7 +1688,7 @@ public T putAndLockSession(Session session, SessionLockAction func) return 0; }, "project id=%d", session.getProjectId()); - storedSession = dao.getSessionByConflictedNamesInternal( + storedSession = dao.getAndLockSessionByConflictedNamesInternal( session.getProjectId(), session.getWorkflowName(), session.getSessionTime().getEpochSecond()); @@ -2082,9 +2094,9 @@ List getAttemptsOfWorkflowWithRetries( " where project_id = :projectId" + " and workflow_name = :workflowName" + " and session_time = :sessionTime" + - " limit 1") // here allows last_attempt_id == NULL - StoredSession getSessionByConflictedNamesInternal(@Bind("projectId") int projectId, - @Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime); + " limit 1 for update") // here allows last_attempt_id == NULL + StoredSession getAndLockSessionByConflictedNamesInternal(@Bind("projectId") int projectId, + @Bind("workflowName") String workflowName, @Bind("sessionTime") long sessionTime); @SqlQuery("select session_time from sessions" + " where project_id = :projectId" + diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 4bc1387fc6..931a3de51d 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -297,9 +297,6 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar SessionStore ss = sm.getSessionStore(siteId); long activeAttempts = ss.getActiveAttemptCount(); - if (activeAttempts + 1 > limits.maxAttempts()) { - throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts); - } stored = ss // putAndLockSession + insertAttempt might be able to be faster by combining them into one method and optimize using a single SQL with CTE @@ -327,6 +324,12 @@ public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar return storedAttemptWithSession; }); + + if (activeAttempts + 1 > limits.maxAttempts()) { + tm.reset(); + throw new AttemptLimitExceededException("Too many attempts running. Limit: " + limits.maxAttempts() + ", Current: " + activeAttempts); + } + } catch (WorkflowTaskLimitExceededException ex) { throw ex.getCause(); diff --git a/digdag-tests/src/test/java/acceptance/RequireIT.java b/digdag-tests/src/test/java/acceptance/RequireIT.java index 638fb1bda4..be54e28b7a 100644 --- a/digdag-tests/src/test/java/acceptance/RequireIT.java +++ b/digdag-tests/src/test/java/acceptance/RequireIT.java @@ -12,6 +12,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import utils.CommandStatus; import utils.TemporaryDigdagServer; @@ -43,6 +45,8 @@ public class RequireIT { + private static Logger logger = LoggerFactory.getLogger(RequireIT.class); + @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -231,13 +235,15 @@ public void testIgnoreProjectIdParam() @Test public void testRequireToAnotherProjectById() - throws Exception { + throws Exception + { testRequireToAnotherProject(true, "parent_by_id", "2020-06-05 00:00:01"); } @Test public void testRequireToAnotherProjectByName() - throws Exception { + throws Exception + { testRequireToAnotherProject(false, "parent_by_name", "2020-06-05 00:00:02"); } @@ -250,7 +256,8 @@ public void testRequireToAnotherProjectByName() * @throws Exception */ private void testRequireToAnotherProject(boolean useProjectId, String parentProjectName, String sessionTime) - throws Exception { + throws Exception + { final String childProjectName = "child_another"; // Push child project @@ -408,7 +415,10 @@ private RestSessionAttemptCollection testRerunOnParam(String sessionTime, String return attempts; } - private static boolean isAttemptSuccess(CommandStatus status) { return status.outUtf8().contains("status: success"); } + private static boolean isAttemptSuccess(CommandStatus status) + { + return status.outUtf8().contains("status: success"); + } private CommandStatus startAndWait(String... args) throws InterruptedException { @@ -560,4 +570,217 @@ public void testTargetSessionIDWhenNoKick(boolean waitChildFinish) assertThat(requireOperatorStateParams.get("target_attempt_id", Id.class), is(targetAttemptId)); assertThat(requireOperatorStateParams.get("target_session_id", Id.class), is(targetSessionId)); } + + @Test + public void testRunningHitMaxAttempt() + throws Exception + { + // Only two attempts are able to run. + // In this situation, both parent_wait_long and child_wait_long must run successfully + try { + server.close(); + server = TemporaryDigdagServer.builder() + .configuration("executor.attempt_max_run = 2") + .build(); + server.start(); + + // Create a new project + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig")); + copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_wait_long", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + // Wait for the attempt to complete + boolean success = false; + for (int i = 0; i < 120; i++) { + CommandStatus attemptsStatus = main("attempts", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + String statusStr = attemptsStatus.outUtf8(); + if (statusStr.contains("status: success")) { + success = true; + break; + } else if (statusStr.contains("status: error")) { + success = false; + logger.error("attempt failed: {}", statusStr); + break; + } + Thread.sleep(1000); + } + assertThat(success, is(true)); + } + finally { + if (server != null) { + server.close(); + } + } + } + + @Test + public void testDelayWithMaxAttempt() + throws Exception + { + // Scenario: + // Only two attempts are able to run. + // One attempt running firstly and run for some duration (reuse 'child_wait_long') + // Then create new attempt which has `require>` (kick 'parent_wait_long') + // Expected: + // The new attempt keep running. The task of `require>` will be retried after 10 min + // The way to confirm: + // To reduce the time of test, only check attempt log shows "Number of attempts or tasks exceed limit" + // + try { + server.close(); + server = TemporaryDigdagServer.builder() + .configuration("executor.attempt_max_run = 2") + .build(); + server.start(); + + // Create a new project + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_wait_long.dig", projectDir.resolve("parent_wait_long.dig")); + copyResource("acceptance/require/child_wait_long.dig", projectDir.resolve("child_wait_long.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + { // create an attempt + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "child_wait_long", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + } + + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_wait_long", + "--session", "2022-12-01 12:34:56"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + { + CommandStatus status = main("attempts", + "-c", config.toString(), + "-e", server.endpoint()); + logger.info("{}", status.outUtf8()); + } + + String logStr = ""; + for (int i = 0; i < 60; i++) { + CommandStatus attemptLog = main("log", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + logStr = attemptLog.outUtf8(); + if (logStr.contains("Number of attempts or tasks exceed limit")) { + return; // OK + } + Thread.sleep(1000); + } + fail("Cannot confirm retry happen. log:\n" + logStr); + } + finally { + if (server != null) { + server.close(); + } + } + } + + @Test + public void testMultipleAttemptsInSession() + throws Exception + { + // Check the issue that 'require>' will fail when run multiple attempts in a session in very short term + // Scenario: + // run "parent_multiple" for a specific session + // the workflow run tasks as follows: + // - firstly run `child_wait` for a session + // - run multiple attempts of `child_wait` for the session in parallel. + // Way to check: + // all attempts finish successfully + CommandStatus initStatus = main("init", + "-c", config.toString(), + projectDir.toString()); + assertThat(initStatus.errUtf8(), initStatus.code(), is(0)); + + copyResource("acceptance/require/parent_multiple.dig", projectDir.resolve("parent_multiple.dig")); + copyResource("acceptance/require/child_wait.dig", projectDir.resolve("child_wait.dig")); + + // Push the project + CommandStatus pushStatus = main("push", + "--project", projectDir.toString(), + "require", + "-c", config.toString(), + "-e", server.endpoint(), + "-r", "4711"); + assertThat(pushStatus.errUtf8(), pushStatus.code(), is(0)); + Id attemptId; + { + CommandStatus startStatus = main("start", + "-c", config.toString(), + "-e", server.endpoint(), + "require", "parent_multiple", + "--session", "now"); + assertThat(startStatus.code(), is(0)); + attemptId = getAttemptId(startStatus); + } + + // Wait for the attempt to complete + boolean success = false; + for (int i = 0; i < 180; i++) { + CommandStatus attemptsStatus = main("attempts", + "-c", config.toString(), + "-e", server.endpoint(), + attemptId.toString()); + String statusStr = attemptsStatus.outUtf8(); + if (statusStr.contains("status: success")) { + success = true; + break; + } else if (statusStr.contains("status: error")) { + success = false; + logger.error("attempt failed: {}", statusStr); + break; + } + Thread.sleep(1000); + } + assertThat(success, is(true)); + } } diff --git a/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig b/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig new file mode 100644 index 0000000000..8dac70a465 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/child_wait_long.dig @@ -0,0 +1,3 @@ ++wait: + wait>: 30s + diff --git a/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig b/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig new file mode 100644 index 0000000000..c96793d2c2 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/parent_multiple.dig @@ -0,0 +1,15 @@ ++t1: + require>: child_wait + session_time: ${session_time} + ++t2: + loop>: 20 + _parallel: true + _do: + +t2_1: + require>: child_wait + rerun_on: all + session_time: ${session_time} + ++t3: + echo>: "Finished successfully: ${session_time}" diff --git a/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig b/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig new file mode 100644 index 0000000000..d23c31bd58 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/require/parent_wait_long.dig @@ -0,0 +1,3 @@ ++require: + require>: child_wait_long +