Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions sources/TokenExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,12 @@ class TokenExecutor: NSObject {
}

// Main queue only
// Returns true if the block was executed.
// Crashes (it_fatalError) on timeout since skipping the joined block would leave state inconsistent.
@objc
func whilePaused(_ block: () -> ()) {
self.impl.whilePaused(block, onExecutorQueue: onExecutorQueue)
@discardableResult
func whilePaused(_ block: () -> ()) -> Bool {
return self.impl.whilePaused(block, onExecutorQueue: onExecutorQueue)
}
}

Expand Down Expand Up @@ -448,41 +451,90 @@ private class TokenExecutorImpl {

// Main queue
// Runs block synchronously while token executor is stopped.
func whilePaused(_ block: () -> (), onExecutorQueue: Bool) {
// Returns true if the block was executed.
// Crashes (it_fatalError) on timeout since skipping the joined block would leave state inconsistent.
@discardableResult
func whilePaused(_ block: () -> (), onExecutorQueue: Bool) -> Bool {
dispatchPrecondition(condition: .onQueue(.main))

#if DEBUG
// Detect reentrant joins
let wasInProgress = Self.joinInProgress.getAndSet(true)
it_assert(!wasInProgress, "Reentrant join detected - check call stack")
defer { Self.joinInProgress.set(false) }
#endif

if gDebugLogging.boolValue { DLog("Incr pending pauses if \(iTermPreferences.maximizeThroughput())") }
var unpauser = iTermPreferences.maximizeThroughput() ? nil : TokenExecutor.globalPause()

let sema = DispatchSemaphore(value: 0)
let sema2 = DispatchSemaphore(value: 0)
// Cancellation flag - checked by async block before setting joined = true.
// This prevents the async block from setting joined state if main times out.
// NOTE: There's a small race window where the async block may have passed the
// cancel check but not yet signaled sema2. Under it_fatalError this is acceptable;
// if soft-failure is ever needed, a stronger handshake (e.g., DispatchWorkItem.cancel)
// would be required.
let cancelled = MutableAtomicObject(false)

queue.async {
// If main timed out and cancelled, don't set joined state
if cancelled.value {
DLog("Join was cancelled due to timeout - not setting joined state")
return
}
sema2.signal()
iTermGCD.joined = true
sema.wait()
iTermGCD.joined = false
DLog("Mutation queue unpaused")
}
if unpauser != nil {
sema2.wait()
} else {
// When maximize throughput is on, we want to avoid pausing token execution but we can't
// let it go on indefinitely. After a timeout, pause it and block for what should be a short
// amount of time (until the current batch of tokens is done being executed).
let timeout = 0.03
if sema2.wait(timeout: .now() + timeout) == .timedOut {

// Timeouts prevent deadlock if mutation queue is stuck.
// DEBUG: Short timeouts catch issues during development.
// RELEASE: Longer timeouts avoid false positives from legitimately busy queues,
// but still catch truly stuck queues.
#if DEBUG
let initialTimeout: TimeInterval = unpauser != nil ? 1.0 : 0.03
let retryTimeout: TimeInterval = 1.0
#else
let initialTimeout: TimeInterval = unpauser != nil ? 5.0 : 0.03
let retryTimeout: TimeInterval = 10.0
#endif

if sema2.wait(timeout: .now() + initialTimeout) == .timedOut {
if unpauser == nil {
// When maximize throughput is on, we want to avoid pausing token execution but we can't
// let it go on indefinitely. After a timeout, pause it and block for what should be a short
// amount of time (until the current batch of tokens is done being executed).
unpauser = TokenExecutor.globalPause()
sema2.wait()
}
// Try once more with pause in effect, but still use a timeout
if sema2.wait(timeout: .now() + retryTimeout) == .timedOut {
DLog("FATAL: whilePaused timed out waiting for mutation queue")
// Cancel the async block before crashing so it doesn't set joined state
cancelled.set(true)
// Signal sema in case the async block is waiting
sema.signal()
unpauser?.unpause()
it_fatalError("whilePaused timed out waiting for mutation queue - join required for state coherence")
}
}

DLog("Mutation queue paused")
block()
if unpauser != nil {
if gDebugLogging.boolValue { DLog("Decr pending pauses") }
}
unpauser?.unpause()
sema.signal()
return true
}

#if DEBUG
private static let joinInProgress = MutableAtomicObject(false)
#endif

// Any queue
func addSideEffect(_ task: @escaping TokenExecutorTask) {
sideEffects.append(task)
Expand Down
8 changes: 7 additions & 1 deletion sources/VT100ScreenMutableState.m
Original file line number Diff line number Diff line change
Expand Up @@ -4895,12 +4895,17 @@ - (void)performBlockWithJoinedThreads:(void (^ NS_NOESCAPE)(VT100Terminal *termi
}
}

- (void)performSynchroDanceWithBlock:(void (^)(void))block {
// Performs a synchronous join with the mutation queue.
// whilePaused: will crash (it_fatalError) if the join times out, so this method
// always returns YES if it returns at all. The BOOL return is kept for API clarity.
- (BOOL)performSynchroDanceWithBlock:(void (^)(void))block {
assert(dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL) == dispatch_queue_get_label(dispatch_get_main_queue()));
DLog(@"begin");
[iTermGCD setMainQueueSafe:YES];

// Stop the token executor while we run `block`.
// Note: whilePaused: crashes on timeout since skipping the joined block would
// leave mutable state inconsistent. If this returns, the join succeeded.
DLog(@"Calling whilePaused:");
[_tokenExecutor whilePaused:^{
// The token executor is now stopped. This runs on the main thread.
Expand All @@ -4911,6 +4916,7 @@ - (void)performSynchroDanceWithBlock:(void (^)(void))block {
DLog(@"unblock executor");
}];
DLog(@"returning");
return YES;
}

// This runs on the main queue while the mutation queue waits on `group`.
Expand Down