diff --git a/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java b/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java index 4766a4d100..c2db0930b2 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java @@ -122,9 +122,10 @@ public void run() int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks; // Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately. int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10); + int actualyAcquired = 0; if (maxAcquire > 0) { metrics.summary(Category.AGENT,"mtag_NumMaxAcquire", maxAcquire); - transactionManager.begin(() -> { + actualyAcquired = transactionManager.begin(() -> { List reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000); for (TaskRequest req : reqs) { executor.submit(() -> { @@ -144,10 +145,10 @@ public void run() }); activeTaskCount.incrementAndGet(); } - return null; + return reqs.size(); }); } - else { + if (actualyAcquired == 0) { metrics.increment(Category.AGENT, "mtag_RunWaitCounter"); // no executor thread is available. sleep for a while until a task execution finishes addActiveTaskLock.wait(500); diff --git a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java index 574d8207ab..6f2b192e39 100644 --- a/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java +++ b/digdag-core/src/main/java/io/digdag/core/database/DatabaseSessionStoreManager.java @@ -282,16 +282,61 @@ public boolean isAnyNotDoneAttempts() ); } + private boolean isPostgres() + { + switch (databaseType) { + case "postgresql": + return true; + default: + return false; + } + } + @DigdagTimed(value = "dssm_", category = "db", appendMethodName = true) @Override - public List findAllReadyTaskIds(int maxEntries, boolean randomFetch) + public Optional tryLockReadyTask(TaskLockActionWithDetails func) { - if (randomFetch) { - return autoCommit((handle, dao) -> dao.findAllTaskIdsByStateAtRandom(TaskStateCode.READY.get(), maxEntries)); - } - else { - return autoCommit((handle, dao) -> dao.findAllTaskIdsByState(TaskStateCode.READY.get(), maxEntries)); - } + return transaction((handle, dao) -> { + if (isPostgres()) { + // FOR UPDATE with SKIP LOCKED is available only with PostgreSQL. + // Optimize two SELECTs into one SELECT with SKIP LOCKED. + StoredTask locked = handle.createQuery( + selectTaskDetailsQuery() + + " where state = " + TaskStateCode.READY.get() + + " limit 1" + + " for update of t skip locked" + ) + .map(stm) + .first(); + + if (locked != null) { + T result = func.call(new DatabaseTaskControlStore(handle), locked); + return Optional.of(result); + } + return Optional.absent(); + + } else { + // H2 doesn't support FOR UPDATE with JOIN. Use two SELECTs + Long lockedTaskId = handle.createQuery( + "select id from tasks" + + " where state = " + TaskStateCode.READY.get() + + " limit 1 for update") + .mapTo(Long.class) + .first(); + if (lockedTaskId != null) { + try { + StoredTask locked = getTaskById(handle, lockedTaskId); // this doesn't need + // FOR UPDATE clause because the row is already locked above + T result = func.call(new DatabaseTaskControlStore(handle), locked); + return Optional.of(result); + } + catch (ResourceNotFoundException ex) { + // Never reach here because above query blocks DELETE + } + } + return Optional.absent(); + } + }); } @DigdagTimed(value = "dssm_", category = "db", appendMethodName = true) @@ -1695,10 +1740,6 @@ void upsertAndLockSession(@Bind("projectId") int projectId, " where id = :id" + " for update") Long lockTaskIfNotLocked(@Bind("id") long taskId); - - @SqlQuery("select id from tasks where state = :state order by random() limit :limit") - List findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit); - } @UseStringTemplate3StatementLocator @@ -1744,9 +1785,6 @@ StoredSession upsertAndLockSession(@Bind("projectId") int projectId, " where id = :id" + " for update skip locked") Long lockTaskIfNotLocked(@Bind("id") long taskId); - - @SqlQuery("select id from tasks where state = :state order by random() limit :limit") - List findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit); } public interface Dao @@ -2043,11 +2081,6 @@ StoredSession getSessionByConflictedNamesInternal(@Bind("projectId") int project @GetGeneratedKeys long insertSessionMonitor(@Bind("attemptId") long attemptId, @Bind("nextRunTime") long nextRunTime, @Bind("type") String type, @Bind("config") Config config); - @SqlQuery("select id from tasks where state = :state limit :limit") - List findAllTaskIdsByState(@Bind("state") short state, @Bind("limit") int limit); - - List findAllTaskIdsByStateAtRandom(@Bind("state") short state, @Bind("limit") int limit); - @SqlQuery("select id, session_id, state_flags, index from session_attempts where id = :attemptId for update") SessionAttemptSummary lockAttempt(@Bind("attemptId") long attemptId); diff --git a/digdag-core/src/main/java/io/digdag/core/session/SessionStoreManager.java b/digdag-core/src/main/java/io/digdag/core/session/SessionStoreManager.java index 0a69b8d2ad..981059741b 100644 --- a/digdag-core/src/main/java/io/digdag/core/session/SessionStoreManager.java +++ b/digdag-core/src/main/java/io/digdag/core/session/SessionStoreManager.java @@ -26,16 +26,12 @@ AttemptStateFlags getAttemptStateFlags(long attemptId) // for WorkflowExecutor.runUntilAny boolean isAnyNotDoneAttempts(); - // for WorkflowExecutor.enqueueReadyTasks (Keep for compatibility) - default List findAllReadyTaskIds(int maxEntries) { return findAllReadyTaskIds(maxEntries, false); } - /** * for WorkflowExecutor.enqueueReadyTasks - * @param maxEntries max number to fetch * @param randomFetch fetch randomly or not(original behavior) * @return */ - List findAllReadyTaskIds(int maxEntries, boolean randomFetch); + Optional tryLockReadyTask(TaskLockActionWithDetails func); // for AttemptTimeoutEnforcer.enforceAttemptTTLs diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java index 4a03e906b4..37c671cf93 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/WorkflowExecutor.java @@ -33,6 +33,7 @@ import io.digdag.core.session.StoredTask; import io.digdag.core.session.Task; import io.digdag.core.session.TaskAttemptSummary; +import io.digdag.core.session.TaskControlStore; import io.digdag.core.session.TaskStateCode; import io.digdag.core.session.TaskStateFlags; import io.digdag.metrics.DigdagTimed; @@ -62,6 +63,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; @@ -178,8 +180,7 @@ public class WorkflowExecutor private final Lock propagatorLock = new ReentrantLock(); private final Condition propagatorCondition = propagatorLock.newCondition(); private volatile boolean propagatorNotice = false; - private final boolean enqueueRandomFetch; - private final Integer enqueueFetchSize; + private final int bulkEnqueueSize; @Inject public WorkflowExecutor( @@ -204,8 +205,7 @@ public WorkflowExecutor( this.systemConfig = systemConfig; this.limits = limits; this.metrics = metrics; - this.enqueueRandomFetch = systemConfig.get("executor.enqueue_random_fetch", Boolean.class, false); - this.enqueueFetchSize = systemConfig.get("executor.enqueue_fetch_size", Integer.class, 100); + this.bulkEnqueueSize = systemConfig.get("executor.bulk_enqueue_size", Integer.class, 100); } public StoredSessionAttemptWithSession submitWorkflow(int siteId, @@ -500,7 +500,7 @@ public void runWhile(BooleanSupplier cond) try (TaskQueuer queuer = new TaskQueuer()) { propagateBlockedChildrenToReady(); retryRetryWaitingTasks(); - enqueueReadyTasks(queuer); // TODO enqueue all (not only first 100) + enqueueReadyTasks(queuer); propagateAllPlannedToDone(); propagateSessionArchive(); @@ -519,7 +519,10 @@ public void runWhile(BooleanSupplier cond) propagateBlockedChildrenToReady(); retryRetryWaitingTasks(); - enqueueReadyTasks(queuer); + + // enqueue at most bulkEnqueueSize tasks. It may leave some + // ready tasks on the database not to be enqueued. + boolean hasMoreReadyTasks = enqueueReadyTasks(queuer); /** * propagateSessionArchive() should be always called. @@ -531,7 +534,7 @@ public void runWhile(BooleanSupplier cond) */ boolean hasModification = propagateAllPlannedToDone(); propagateSessionArchive(); - if (hasModification) { + if (hasMoreReadyTasks || hasModification) { //propagateSessionArchive(); } else { @@ -928,101 +931,98 @@ public void close() //} } - @VisibleForTesting - protected Function funcEnqueueTask() - { - return (tId) -> - tm.begin(() -> { - enqueueTask(dispatcher, tId); - return true; - }); - } - @DigdagTimed(category = "executor", appendMethodName = true) - protected void enqueueReadyTasks(TaskQueuer queuer) + protected boolean enqueueReadyTasks(TaskQueuer queuer) { - List readyTaskIds = tm.begin(() -> sm.findAllReadyTaskIds(enqueueFetchSize, enqueueRandomFetch)); - logger.trace("readyTaskIds:{}", readyTaskIds); - for (long taskId : readyTaskIds) { // TODO randomize this result to achieve concurrency - catching(()->funcEnqueueTask().apply(taskId), true, "Failed to call enqueueTask. taskId:" + taskId); - //queuer.asyncEnqueueTask(taskId); // TODO async queuing is probably unnecessary but not sure + for (int i = 0; i < bulkEnqueueSize; i++) { + boolean changed = tm.begin(() -> sm.tryLockReadyTask((store, task) -> { + catching(()->enqueueLockedTask(dispatcher, store, task), true, "Failed to call enqueueTask. taskId:" + task.getId()); + return true; + })).or(false); + if (!changed) { + // Break and return false if no tasks are ready + return false; + } } + return true; // Run next enqueueReadyTasks immediately without sleep at runWhile } @DigdagTimed(category="executor", appendMethodName = true) - protected void enqueueTask(final TaskQueueDispatcher dispatcher, final long taskId) + @VisibleForTesting + protected boolean enqueueLockedTask(final TaskQueueDispatcher dispatcher, + final TaskControlStore store, final StoredTask task) { - sm.lockTaskIfNotLocked(taskId, (store, task) -> { - TaskControl lockedTask = new TaskControl(store, task, limits); - if (lockedTask.getState() != TaskStateCode.READY) { - return false; - } - - if (task.getTaskType().isGroupingOnly()) { - return retryGroupingTask(lockedTask); - } + final long taskId = task.getId(); - // NOTE: Nothing to do here because CANCEL_REQUESTED task will be handled by an agent. - // See also state transitions. - - int siteId; - try { - siteId = sm.getSiteIdOfTask(taskId); - } - catch (ResourceNotFoundException ex) { - tm.reset(); - Exception error = new IllegalStateException("Task id="+taskId+" is ready to run but associated session attempt does not exist.", ex); - logger.error("Database state error enqueuing task.", error); - return false; - } + TaskControl lockedTask = new TaskControl(store, task, limits); + if (lockedTask.getState() != TaskStateCode.READY) { + return false; + } - try { - // TODO make queue name configurable. note that it also needs a new REST API and/or - // CLI ccommands to create/delete/manage queues. - Optional queueName = Optional.absent(); + if (task.getTaskType().isGroupingOnly()) { + return retryGroupingTask(lockedTask); + } - String encodedUnique = encodeUniqueQueuedTaskName(lockedTask.get()); + // NOTE: Nothing to do here because CANCEL_REQUESTED task will be handled by an agent. + // See also state transitions. - TaskQueueRequest request = TaskQueueRequest.builder() - .priority(0) // TODO make this configurable - .uniqueName(encodedUnique) - .data(Optional.absent()) - .build(); + int siteId; + try { + siteId = sm.getSiteIdOfTask(taskId); + } + catch (ResourceNotFoundException ex) { + tm.reset(); + Exception error = new IllegalStateException("Task id="+taskId+" is ready to run but associated session attempt does not exist.", ex); + logger.error("Database state error enqueuing task.", error); + return false; + } - logger.debug("Queuing task of attempt_id={}: id={} {}", task.getAttemptId(), task.getId(), task.getFullName()); - try { - dispatcher.dispatch(siteId, queueName, request); - } - catch (TaskConflictException ex) { - tm.reset(); - logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing", - encodedUnique, queueName.or(""), siteId); - } + try { + // TODO make queue name configurable. note that it also needs a new REST API and/or + // CLI ccommands to create/delete/manage queues. + Optional queueName = Optional.absent(); - //// - // don't throw exceptions after here. task is already dispatched to a queue - // + String encodedUnique = encodeUniqueQueuedTaskName(lockedTask.get()); - boolean updated = lockedTask.setReadyToRunning(); - if (!updated) { - // return value of setReadyToRunning must be true because this task is locked - // (won't be updated by other machines concurrently) and confirmed that - // current state is READY. - logger.warn("Unexpected state change failure from READY to RUNNING: {}", task); - } + TaskQueueRequest request = TaskQueueRequest.builder() + .priority(0) // TODO make this configurable + .uniqueName(encodedUnique) + .data(Optional.absent()) + .build(); - return updated; + logger.debug("Queuing task of attempt_id={}: id={} {}", task.getAttemptId(), task.getId(), task.getFullName()); + try { + dispatcher.dispatch(siteId, queueName, request); } - catch (Exception ex) { + catch (TaskConflictException ex) { tm.reset(); - logger.error( - LogMarkers.UNEXPECTED_SERVER_ERROR, - "Enqueue error, making this task failed: {}", task, ex); - // TODO retry here? - return taskFailed(lockedTask, - buildExceptionErrorConfig(ex).toConfig(cf)); + logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing", + encodedUnique, queueName.or(""), siteId); + } + + //// + // don't throw exceptions after here. task is already dispatched to a queue + // + + boolean updated = lockedTask.setReadyToRunning(); + if (!updated) { + // return value of setReadyToRunning must be true because this task is locked + // (won't be updated by other machines concurrently) and confirmed that + // current state is READY. + logger.warn("Unexpected state change failure from READY to RUNNING: {}", task); } - }).or(false); + + return updated; + } + catch (Exception ex) { + tm.reset(); + logger.error( + LogMarkers.UNEXPECTED_SERVER_ERROR, + "Enqueue error, making this task failed: {}", task, ex); + // TODO retry here? + return taskFailed(lockedTask, + buildExceptionErrorConfig(ex).toConfig(cf)); + } } private static String encodeUniqueQueuedTaskName(StoredTask task) diff --git a/digdag-core/src/test/java/io/digdag/core/workflow/WorkflowExecutorCatchingTest.java b/digdag-core/src/test/java/io/digdag/core/workflow/WorkflowExecutorCatchingTest.java index 872af1fa09..cab60111b1 100644 --- a/digdag-core/src/test/java/io/digdag/core/workflow/WorkflowExecutorCatchingTest.java +++ b/digdag-core/src/test/java/io/digdag/core/workflow/WorkflowExecutorCatchingTest.java @@ -11,7 +11,9 @@ import io.digdag.core.database.TransactionManager; import io.digdag.core.repository.ProjectStoreManager; import io.digdag.core.session.SessionStoreManager; +import io.digdag.core.session.StoredTask; import io.digdag.core.session.TaskAttemptSummary; +import io.digdag.core.session.TaskControlStore; import io.digdag.spi.CommandExecutor; import io.digdag.spi.metrics.DigdagMetrics; import org.junit.After; @@ -143,7 +145,8 @@ public WorkflowExecutorWithArbitraryErrors(ProjectStoreManager rm, } @Override - protected Function funcEnqueueTask() + protected boolean enqueueLockedTask(final TaskQueueDispatcher dispatcher, + final TaskControlStore store, final StoredTask task) { funcEnqueueTaskCounter++; logger.debug("funcEnqueueTask called:" + funcEnqueueTaskCounter); @@ -151,7 +154,7 @@ protected Function funcEnqueueTask() logger.info("funcEnqueueTask() throw Exception. counter=" + funcEnqueueTaskCounter); throw new RuntimeException("Unknown exception"); } - return super.funcEnqueueTask(); + return super.enqueueLockedTask(dispatcher, store, task); } @Override