From 3360501d2bc53f6c5d09a5faa87bc0271f4a0dec Mon Sep 17 00:00:00 2001 From: chall37 Date: Fri, 20 Feb 2026 12:44:20 -0800 Subject: [PATCH] Add per-PTY dispatch sources as alternative to TaskNotifier select() loop Replace the shared TaskNotifier select() loop with per-PTY serial dispatch queues, each owning DispatchSource read/write/process sources. Gated behind the UsePerPTYDispatchSources advanced setting (default off). Key design: - PTYTaskIOHandler owns the ioQueue and all dispatch sources for one PTY - DISPATCH_SOURCE_TYPE_PROC monitors child exit to force-resume the read source for EOF delivery even when suspended for backpressure/pause - Backpressure integration: TokenExecutor tracks available slots via an atomic counter; when heavy backpressure is detected, wantsRead returns false and the read source suspends. A backpressureReleaseHandler wired in PTYSession re-evaluates read state when pressure drops - Backpressure-gated reads are restricted to per-PTY mode to avoid stalling the legacy TaskNotifier path which has no wake-up mechanism - Registration routing via PTYTask class methods so job managers do not need to know which I/O path is active - Coprocess sources are serialized on ioQueue to prevent data races with event handlers Includes comprehensive test coverage: lifecycle, state predicates, EOF detection, read/write round-trips, coprocess management, and concurrency regression tests. --- .../DispatchSourceConcurrencyTests.swift | 157 ++++ .../DispatchSourceCoprocessTests.swift | 160 ++++ .../DispatchSourceEOFTests.swift | 300 +++++++ .../DispatchSourceLifecycleTests.swift | 286 +++++++ .../DispatchSourceReadWriteTests.swift | 575 +++++++++++++ .../DispatchSourceStateTests.swift | 769 ++++++++++++++++++ .../DispatchSources/MockJobManager.swift | 87 ++ .../DispatchSources/MockPTYTaskDelegate.swift | 158 ++++ ModernTests/DispatchSources/TestHelpers.swift | 89 ++ iTerm2.xcodeproj/project.pbxproj | 4 + sources/PTYSession.m | 13 + sources/PTYTask+Private.h | 2 + sources/PTYTask.h | 39 + sources/PTYTask.m | 353 +++++++- sources/PTYTaskIOHandler.swift | 438 ++++++++++ sources/TokenArray.swift | 19 +- sources/TokenExecutor.swift | 85 +- sources/iTerm2SharedARC-Bridging-Header.h | 1 + sources/iTermAdvancedSettingsModel.h | 1 + sources/iTermAdvancedSettingsModel.m | 1 + sources/iTermLegacyJobManager.m | 3 +- sources/iTermMonoServerJobManager.m | 5 +- sources/iTermMultiServerJobManager.m | 5 +- 23 files changed, 3526 insertions(+), 24 deletions(-) create mode 100644 ModernTests/DispatchSources/DispatchSourceConcurrencyTests.swift create mode 100644 ModernTests/DispatchSources/DispatchSourceCoprocessTests.swift create mode 100644 ModernTests/DispatchSources/DispatchSourceEOFTests.swift create mode 100644 ModernTests/DispatchSources/DispatchSourceLifecycleTests.swift create mode 100644 ModernTests/DispatchSources/DispatchSourceReadWriteTests.swift create mode 100644 ModernTests/DispatchSources/DispatchSourceStateTests.swift create mode 100644 ModernTests/DispatchSources/MockJobManager.swift create mode 100644 ModernTests/DispatchSources/MockPTYTaskDelegate.swift create mode 100644 ModernTests/DispatchSources/TestHelpers.swift create mode 100644 sources/PTYTaskIOHandler.swift diff --git a/ModernTests/DispatchSources/DispatchSourceConcurrencyTests.swift b/ModernTests/DispatchSources/DispatchSourceConcurrencyTests.swift new file mode 100644 index 0000000000..a255b4ae0e --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceConcurrencyTests.swift @@ -0,0 +1,157 @@ +// +// DispatchSourceConcurrencyTests.swift +// ModernTests +// +// Regression tests for data races in dispatch source management. +// These verify that concurrent operations on PTYTaskIOHandler don't crash. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Coprocess Setup Serialization Tests + +/// Regression test: setupCoprocessSources must serialize source +/// reference assignments on the ioQueue to prevent data races with event +/// handlers and updateCoprocess*SourceState. +final class DispatchSourceCoprocessSerializationTests: XCTestCase { + + /// Rapid sequential setup/teardown of coprocess sources while the primary + /// sources are active. Tests that the syncOnIOQueue wrapper properly + /// serializes reference assignments with event handlers. + func testRapidCoprocessSetupTeardownCycles() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let ptyPipe = createTestPipe() else { + XCTFail("Failed to create PTY pipe") + return + } + defer { closeTestPipe(ptyPipe) } + + task.testSetFd(ptyPipe.readFd) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + guard let coprocessPipe = createTestPipe() else { + XCTFail("Failed to create coprocess pipe") + task.testTeardownDispatchSourcesForTesting() + return + } + defer { closeTestPipe(coprocessPipe) } + + // Rapid setup/teardown cycles — each must properly cancel old sources + // and assign new references atomically on ioQueue. + for _ in 0..<20 { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, + writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), + "Coprocess read source should exist after setup") + XCTAssertTrue(task.testHasCoprocessWriteSource(), + "Coprocess write source should exist after setup") + + task.testTeardownCoprocessSources() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasCoprocessReadSource(), + "Coprocess read source should be nil after teardown") + XCTAssertFalse(task.testHasCoprocessWriteSource(), + "Coprocess write source should be nil after teardown") + } + + task.testTeardownDispatchSourcesForTesting() + } + +} + +// MARK: - Concurrent Teardown + Update Tests + +/// Regression test: teardown() concurrent with +/// updateReadSourceState/updateWriteSourceState must not crash. +/// Before the centralized helpers, source references were read from the +/// calling queue and then mutated on ioQueue, creating a TOCTOU race. +final class DispatchSourceConcurrentTeardownUpdateTests: XCTestCase { + + func testConcurrentTeardownAndUpdateDoesNotCrash() { + for _ in 0..<5 { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + let group = DispatchGroup() + + // Spam update calls from multiple threads (uses ioQueue.async internally) + for _ in 0..<10 { + group.enter() + DispatchQueue.global().async { + task.perform(NSSelectorFromString("updateReadSourceState")) + task.perform(NSSelectorFromString("updateWriteSourceState")) + group.leave() + } + } + + // Concurrently tear down (uses ioQueue.sync internally) + group.enter() + DispatchQueue.global().async { + task.testTeardownDispatchSourcesForTesting() + group.leave() + } + + let result = group.wait(timeout: .now() + 5.0) + XCTAssertEqual(result, .success, "Concurrent teardown+update timed out") + + XCTAssertFalse(task.testHasReadSource(), + "Read source should be nil after teardown") + XCTAssertFalse(task.testHasWriteSource(), + "Write source should be nil after teardown") + } + } + + func testRapidSetupTeardownCyclesDoNotCrash() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + + // Rapid setup/teardown cycles from the same thread + for _ in 0..<20 { + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + task.paused = true + task.paused = false + task.testTeardownDispatchSourcesForTesting() + } + + XCTAssertFalse(task.testHasReadSource(), + "Read source should be nil after final teardown") + XCTAssertFalse(task.testHasWriteSource(), + "Write source should be nil after final teardown") + } +} diff --git a/ModernTests/DispatchSources/DispatchSourceCoprocessTests.swift b/ModernTests/DispatchSources/DispatchSourceCoprocessTests.swift new file mode 100644 index 0000000000..72f72c2ba7 --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceCoprocessTests.swift @@ -0,0 +1,160 @@ +// +// DispatchSourceCoprocessTests.swift +// ModernTests +// +// Tests for coprocess dispatch source lifecycle: setup, teardown, +// suspend/resume state, and interaction with primary source teardown. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Coprocess Dispatch Source Tests + +/// Tests for coprocess dispatch source setup, teardown, and state transitions. +final class DispatchSourceCoprocessTests: XCTestCase { + + var task: PTYTask! + /// Primary PTY pipe — needed for testSetupDispatchSourcesForTesting. + var ptyPipe: (readFd: Int32, writeFd: Int32)! + /// Coprocess pipe — used as the coprocess read/write fds. + var coprocessPipe: (readFd: Int32, writeFd: Int32)! + + override func setUp() { + super.setUp() + task = PTYTask() + + ptyPipe = createTestPipe() + XCTAssertNotNil(ptyPipe, "Failed to create PTY pipe") + + coprocessPipe = createTestPipe() + XCTAssertNotNil(coprocessPipe, "Failed to create coprocess pipe") + + task.testSetFd(ptyPipe.readFd) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + } + + override func tearDown() { + task.testTeardownDispatchSourcesForTesting() + if ptyPipe != nil { closeTestPipe(ptyPipe) } + if coprocessPipe != nil { closeTestPipe(coprocessPipe) } + task = nil + super.tearDown() + } + + // MARK: - Setup + + func testSetupCoprocessSourcesCreatesSources() { + XCTAssertFalse(task.testHasCoprocessReadSource(), "No coprocess read source before setup") + XCTAssertFalse(task.testHasCoprocessWriteSource(), "No coprocess write source before setup") + + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), "Coprocess read source should exist after setup") + XCTAssertTrue(task.testHasCoprocessWriteSource(), "Coprocess write source should exist after setup") + } + + func testCoprocessSourcesStartSuspended() { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsCoprocessReadSourceSuspended(), + "Coprocess read source should start suspended") + XCTAssertTrue(task.testIsCoprocessWriteSourceSuspended(), + "Coprocess write source should start suspended") + } + + // MARK: - Teardown + + func testTeardownCoprocessSourcesCleansUp() { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), "Coprocess read source should exist") + XCTAssertTrue(task.testHasCoprocessWriteSource(), "Coprocess write source should exist") + + task.testTeardownCoprocessSources() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasCoprocessReadSource(), "Coprocess read source should be nil after teardown") + XCTAssertFalse(task.testHasCoprocessWriteSource(), "Coprocess write source should be nil after teardown") + } + + func testTeardownCoprocessSourcesSafeWithoutSetup() { + XCTAssertFalse(task.testHasCoprocessReadSource(), "No coprocess read source before setup") + XCTAssertFalse(task.testHasCoprocessWriteSource(), "No coprocess write source before setup") + + // Should not crash + task.testTeardownCoprocessSources() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasCoprocessReadSource(), "Still no coprocess read source") + XCTAssertFalse(task.testHasCoprocessWriteSource(), "Still no coprocess write source") + } + + func testDoubleTeardownCoprocessSourcesSafe() { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + task.testTeardownCoprocessSources() + task.testWaitForIOQueue() + + // Second teardown should not crash + task.testTeardownCoprocessSources() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasCoprocessReadSource(), "No coprocess read source after double teardown") + XCTAssertFalse(task.testHasCoprocessWriteSource(), "No coprocess write source after double teardown") + } + + // MARK: - Replacement + + /// Regression: setupCoprocessSources must tear down existing sources before creating new ones. + func testSetupCoprocessSourcesTearsDownExisting() { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), "First coprocess read source should exist") + + // Create a second pipe for the replacement coprocess + guard let secondPipe = createTestPipe() else { + XCTFail("Failed to create second coprocess pipe") + return + } + defer { closeTestPipe(secondPipe) } + + // Second setup should tear down first sources, then create new ones — no crash + task.testSetupCoprocessSources(withReadFd: secondPipe.readFd, writeFd: secondPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), "Replacement coprocess read source should exist") + XCTAssertTrue(task.testHasCoprocessWriteSource(), "Replacement coprocess write source should exist") + } + + // MARK: - Primary teardown interaction + + /// teardown() on PTYTaskIOHandler calls teardownCoprocessSources(), so tearing + /// down primary sources must also clean up coprocess sources. + func testPrimaryTeardownAlsoTearsDownCoprocessSources() { + task.testSetupCoprocessSources(withReadFd: coprocessPipe.readFd, writeFd: coprocessPipe.writeFd) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasCoprocessReadSource(), "Coprocess read source should exist") + XCTAssertTrue(task.testHasCoprocessWriteSource(), "Coprocess write source should exist") + + // Primary teardown should also clean up coprocess sources + task.testTeardownDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasCoprocessReadSource(), + "Coprocess read source should be nil after primary teardown") + XCTAssertFalse(task.testHasCoprocessWriteSource(), + "Coprocess write source should be nil after primary teardown") + + // Re-setup primary sources so tearDown() doesn't double-teardown + task.testSetFd(ptyPipe.readFd) + task.testSetupDispatchSourcesForTesting() + } +} diff --git a/ModernTests/DispatchSources/DispatchSourceEOFTests.swift b/ModernTests/DispatchSources/DispatchSourceEOFTests.swift new file mode 100644 index 0000000000..9a695d671b --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceEOFTests.swift @@ -0,0 +1,300 @@ +// +// DispatchSourceEOFTests.swift +// ModernTests +// +// Tests for EOF propagation and broken pipe handling with dispatch sources. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - EOF Propagation Tests + +/// Tests that EOF on the PTY fd (read returning 0) correctly triggers brokenPipe. +final class DispatchSourceEOFTests: XCTestCase { + + var task: PTYTask! + var mockDelegate: MockPTYTaskDelegate! + var pipe: (readFd: Int32, writeFd: Int32)! + + override func setUp() { + super.setUp() + task = PTYTask() + mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + + pipe = createTestPipe() + XCTAssertNotNil(pipe, "Failed to create test pipe") + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.testSetupDispatchSourcesForTesting() + } + + override func tearDown() { + task.testTeardownDispatchSourcesForTesting() + if pipe != nil { + close(pipe.writeFd) + } + task.delegate = nil + task = nil + mockDelegate = nil + super.tearDown() + } + + /// Closing the write end of the pipe triggers EOF on the read end. + /// handleReadEvent must detect this and call brokenPipe. + func testEOFTriggersBrokenPipe() { + close(pipe.writeFd) + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe should be set after EOF") + XCTAssertTrue(mockDelegate.threadedBrokenPipeCalled, + "threadedTaskBrokenPipe should be called on EOF") + } + + /// When there's buffered data followed by EOF, the data must be delivered + /// before brokenPipe is called. + func testEOFDeliversBufferedDataFirst() { + let testMessage = "hello from PTY" + + writeToFd(pipe.writeFd, data: testMessage) + close(pipe.writeFd) + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe should be set after EOF") + + XCTAssertFalse(mockDelegate.readData.isEmpty, + "Data should be delivered before brokenPipe") + let received = mockDelegate.readData.reduce(Data(), +) + let receivedString = String(data: received, encoding: .utf8) + XCTAssertEqual(receivedString, testMessage, + "All buffered data should be delivered before EOF processing") + } +} + +// MARK: - EOF While Paused Tests + +/// Regression test: EOF must be detected even when the task is paused. +/// Pausing suspends the read source. The proc source (DISPATCH_SOURCE_TYPE_PROC) +/// detects process exit and force-resumes the read source for EOF delivery. +/// In tests we simulate this with testSimulateProcessExit(). +final class DispatchSourceEOFWhilePausedTests: XCTestCase { + + var task: PTYTask! + var mockDelegate: MockPTYTaskDelegate! + var pipe: (readFd: Int32, writeFd: Int32)! + + override func setUp() { + super.setUp() + task = PTYTask() + mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + + pipe = createTestPipe() + XCTAssertNotNil(pipe, "Failed to create test pipe") + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.testSetupDispatchSourcesForTesting() + } + + override func tearDown() { + task.testTeardownDispatchSourcesForTesting() + if pipe != nil { + close(pipe.writeFd) + } + task.delegate = nil + task = nil + mockDelegate = nil + super.tearDown() + } + + /// Closing the write end while paused must still trigger brokenPipe. + /// The proc source detects process exit and force-resumes the read source + /// so EOF can be delivered. We simulate this with testSimulateProcessExit(). + func testEOFDetectedWhilePaused() { + task.testWaitForIOQueue() + + task.paused = true + task.testWaitForIOQueue() + + // EOF while paused + close(pipe.writeFd) + + // Simulate the proc source firing (real child processes get a real proc source) + task.testSimulateProcessExit() + task.testWaitForIOQueue() + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe must be detected even while paused") + XCTAssertTrue(mockDelegate.threadedBrokenPipeCalled, + "threadedTaskBrokenPipe must be called even while paused") + } + + /// Buffered data followed by EOF while paused must drain data + /// before reporting brokenPipe, once the proc source force-resumes reads. + func testEOFWhilePausedDeliversBufferedData() { + task.testWaitForIOQueue() + + task.paused = true + task.testWaitForIOQueue() + + let testMessage = "data before EOF while paused" + writeToFd(pipe.writeFd, data: testMessage) + close(pipe.writeFd) + + // Simulate the proc source firing + task.testSimulateProcessExit() + task.testWaitForIOQueue() + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe must be detected even while paused") + + XCTAssertFalse(mockDelegate.readData.isEmpty, + "Buffered data must be delivered even while paused") + let received = mockDelegate.readData.reduce(Data(), +) + let receivedString = String(data: received, encoding: .utf8) + XCTAssertEqual(receivedString, testMessage, + "All buffered data must be delivered before brokenPipe while paused") + } +} + +// MARK: - Dispatch Source Teardown on brokenPipe Tests + +/// Tests that brokenPipe tears down dispatch sources, preventing post-deregister +/// handler invocations on a dead fd. +final class DispatchSourceBrokenPipeTeardownTests: XCTestCase { + + var task: PTYTask! + var mockDelegate: MockPTYTaskDelegate! + var pipe: (readFd: Int32, writeFd: Int32)! + + override func setUp() { + super.setUp() + task = PTYTask() + mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + + pipe = createTestPipe() + XCTAssertNotNil(pipe, "Failed to create test pipe") + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.testSetupDispatchSourcesForTesting() + } + + override func tearDown() { + task.testTeardownDispatchSourcesForTesting() + if pipe != nil { + close(pipe.writeFd) + } + task.delegate = nil + task = nil + mockDelegate = nil + super.tearDown() + } + + /// After brokenPipe (triggered by EOF), dispatch sources must be torn down. + func testBrokenPipeTeardownsDispatchSources() { + task.testWaitForIOQueue() + XCTAssertTrue(task.testHasReadSource(), "Read source should exist before EOF") + XCTAssertTrue(task.testHasWriteSource(), "Write source should exist before EOF") + + close(pipe.writeFd) + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe should be set after EOF") + + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasReadSource(), "Read source must be nil after brokenPipe") + XCTAssertFalse(task.testHasWriteSource(), "Write source must be nil after brokenPipe") + } + + /// After brokenPipe, the read source must not fire again. + func testNoReadEventsAfterBrokenPipe() { + close(pipe.writeFd) + + let result = waitForCondition({ self.task.hasBrokenPipe() }, timeout: 2.0) + XCTAssertTrue(result, "brokenPipe should be set after EOF") + + task.testWaitForIOQueue() + let readCountAfterBrokenPipe = mockDelegate.readData.count + + Thread.sleep(forTimeInterval: 0.1) + task.testWaitForIOQueue() + + XCTAssertEqual(mockDelegate.readData.count, readCountAfterBrokenPipe, + "No new read events should fire after brokenPipe tears down sources") + } +} + +// MARK: - Edge Case Tests + +/// Tests for PTYTask edge cases and nil-safety +final class DispatchSourceEdgeCaseTests: XCTestCase { + + func testFreshTaskHasValidState() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertFalse(task.paused, "Fresh task should not be paused") + XCTAssertEqual(task.fd, -1, "Fresh task should have invalid fd") + } + + func testTaskWithNilDelegate() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.delegate = nil + + task.paused = true + XCTAssertTrue(task.paused, "Pause should work with nil delegate") + + task.paused = false + XCTAssertFalse(task.paused, "Unpause should work with nil delegate") + + XCTAssertFalse(task.wantsRead, "wantsRead should be false without job manager") + XCTAssertFalse(task.wantsWrite, "wantsWrite should be false without job manager") + + let readSelector = NSSelectorFromString("updateReadSourceState") + let writeSelector = NSSelectorFromString("updateWriteSourceState") + + if task.responds(to: readSelector) { + task.perform(readSelector) + } + if task.responds(to: writeSelector) { + task.perform(writeSelector) + } + + XCTAssertFalse(task.testHasReadSource(), "No read source with nil delegate") + XCTAssertFalse(task.testHasWriteSource(), "No write source with nil delegate") + } + + func testConcurrentPauseChanges() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + let group = DispatchGroup() + + for _ in 0..<10 { + group.enter() + DispatchQueue.global().async { + for _ in 0..<100 { + task.paused = true + task.paused = false + } + group.leave() + } + } + + group.wait() + } +} diff --git a/ModernTests/DispatchSources/DispatchSourceLifecycleTests.swift b/ModernTests/DispatchSources/DispatchSourceLifecycleTests.swift new file mode 100644 index 0000000000..8bb9214384 --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceLifecycleTests.swift @@ -0,0 +1,286 @@ +// +// DispatchSourceLifecycleTests.swift +// ModernTests +// +// Tests for dispatch source setup, teardown, and idempotent operations. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Dispatch Source Lifecycle Tests + +/// Tests for dispatch source setup and teardown +final class DispatchSourceLifecycleTests: XCTestCase { + + func testSetupCreatesSourcesWhenFdValid() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + + XCTAssertFalse(task.testHasReadSource(), "No read source before setup") + XCTAssertFalse(task.testHasWriteSource(), "No write source before setup") + + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasReadSource(), "Read source should be created") + XCTAssertTrue(task.testHasWriteSource(), "Write source should be created") + + // Write source should be suspended (empty buffer) + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should start suspended (empty buffer)") + + // Read source suspends when paused (proc source handles EOF detection) + task.paused = true + task.testWaitForIOQueue() + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should be suspended when paused") + + task.testTeardownDispatchSourcesForTesting() + } + + func testTeardownCleansUpSources() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasReadSource(), "Read source should exist after setup") + XCTAssertTrue(task.testHasWriteSource(), "Write source should exist after setup") + + task.testTeardownDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testHasReadSource(), "Read source should be nil after teardown") + XCTAssertFalse(task.testHasWriteSource(), "Write source should be nil after teardown") + } + + func testUpdateMethodsExist() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + let readSelector = NSSelectorFromString("updateReadSourceState") + let writeSelector = NSSelectorFromString("updateWriteSourceState") + + XCTAssertTrue(task.responds(to: readSelector), + "PTYTask should have updateReadSourceState") + XCTAssertTrue(task.responds(to: writeSelector), + "PTYTask should have updateWriteSourceState") + } + + func testTeardownIsSafeWithoutSetup() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertFalse(task.testHasReadSource(), "No read source should exist before setup") + XCTAssertFalse(task.testHasWriteSource(), "No write source should exist before setup") + + let selector = NSSelectorFromString("teardownDispatchSources") + if task.responds(to: selector) { + task.perform(selector) + } + + XCTAssertFalse(task.testHasReadSource(), "No read source after teardown on fresh task") + XCTAssertFalse(task.testHasWriteSource(), "No write source after teardown on fresh task") + } + + func testMultipleTeardownCallsSafe() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + let selector = NSSelectorFromString("teardownDispatchSources") + guard task.responds(to: selector) else { + XCTFail("PTYTask should respond to teardownDispatchSources") + return + } + + for i in 0..<5 { + task.perform(selector) + XCTAssertFalse(task.testHasReadSource(), "No read source after teardown \(i)") + XCTAssertFalse(task.testHasWriteSource(), "No write source after teardown \(i)") + } + } + + func testTeardownWithSuspendedReadSourceWhilePaused() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + // Read source suspends when paused (proc source handles EOF detection) + task.paused = true + task.testWaitForIOQueue() + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should be suspended when paused") + + // Teardown with suspended read source while paused - should NOT crash + task.testTeardownDispatchSourcesForTesting() + XCTAssertFalse(task.testHasReadSource(), "Read source should be nil after teardown") + } + + func testTeardownWithSuspendedWriteSource() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should be suspended with empty buffer") + + // Teardown with suspended write source - should NOT crash + task.testTeardownDispatchSourcesForTesting() + XCTAssertFalse(task.testHasWriteSource(), "Write source should be nil after teardown") + } + + func testTeardownWhilePausedWithBothSourcesSuspended() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + task.paused = true + task.testWaitForIOQueue() + + // Both sources suspend when paused (proc source handles EOF detection) + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should be suspended when paused") + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should be suspended (empty buffer)") + + // Teardown with both sources suspended - should NOT crash + task.testTeardownDispatchSourcesForTesting() + XCTAssertFalse(task.testHasReadSource(), "Read source should be nil after teardown") + XCTAssertFalse(task.testHasWriteSource(), "Write source should be nil after teardown") + } + + /// Regression test: closeFileDescriptorAndDeregisterIfPossible must tear down + /// dispatch sources before the job manager closes the fd. Otherwise the sources + /// remain active on a potentially reused descriptor. + func testCloseFileDescriptorTearsDownSourcesFirst() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + // testSetFd creates an iTermLegacyJobManager, so set the mock AFTER + task.testSetFd(pipe.readFd) + let mockJobManager = MockJobManager() + mockJobManager.fd = pipe.readFd + task.testSetJobManager(mockJobManager) + task.testIoAllowedOverride = NSNumber(value: true) + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasReadSource(), "Read source should exist after setup") + XCTAssertTrue(task.testHasWriteSource(), "Write source should exist after setup") + + // This must tear down sources before closing the fd + task.closeFileDescriptorAndDeregisterIfPossible() + + XCTAssertFalse(task.testHasReadSource(), + "Read source should be torn down after closeFileDescriptorAndDeregisterIfPossible") + XCTAssertFalse(task.testHasWriteSource(), + "Write source should be torn down after closeFileDescriptorAndDeregisterIfPossible") + XCTAssertEqual(mockJobManager.closeFileDescriptorCallCount, 1, + "Job manager closeFileDescriptor should have been called") + } + + /// Regression test: When registration is dispatched async to the main queue, + /// the task may be stopped (fd closed) before didRegister runs. didRegister + /// must guard against fd < 0 and skip ioHandler creation. + func testDidRegisterSkipsIOHandlerWhenFdClosed() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + // A fresh PTYTask has fd = -1 (no job manager fd yet). + // This simulates the race where the task is stopped before didRegister runs. + XCTAssertEqual(task.fd, -1, "Fresh task should have fd = -1") + + // Enable the per-PTY dispatch sources setting for this test + let key = "UsePerPTYDispatchSources" + let oldValue = UserDefaults.standard.object(forKey: key) + UserDefaults.standard.set(true, forKey: key) + iTermAdvancedSettingsModel.loadAdvancedSettingsFromUserDefaults() + defer { + if let old = oldValue { + UserDefaults.standard.set(old, forKey: key) + } else { + UserDefaults.standard.removeObject(forKey: key) + } + iTermAdvancedSettingsModel.loadAdvancedSettingsFromUserDefaults() + } + + XCTAssertTrue(iTermAdvancedSettingsModel.usePerPTYDispatchSources(), + "Setting should be enabled for this test") + + // Call didRegister — should NOT crash and should NOT create an ioHandler. + // didRegister is declared on the iTermTask protocol; use perform() to call it. + task.perform(NSSelectorFromString("didRegister")) + + // No sources should exist because didRegister guarded against fd < 0 + XCTAssertFalse(task.testHasReadSource(), + "No read source should be created when fd is closed at registration time") + XCTAssertFalse(task.testHasWriteSource(), + "No write source should be created when fd is closed at registration time") + } +} diff --git a/ModernTests/DispatchSources/DispatchSourceReadWriteTests.swift b/ModernTests/DispatchSources/DispatchSourceReadWriteTests.swift new file mode 100644 index 0000000000..b43c9845f5 --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceReadWriteTests.swift @@ -0,0 +1,575 @@ +// +// DispatchSourceReadWriteTests.swift +// ModernTests +// +// Read handler pipeline and write path round-trip tests for dispatch sources. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Event Handler Tests + +/// Tests for handler method existence and wiring +final class DispatchSourceEventHandlerTests: XCTestCase { + + func testHandleReadEventWiring() { + // PTYTaskIOHandler's handleReadEvent is private, invoked by dispatch source. + // Verify the handler is correctly wired by checking that dispatch sources are created. + + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasReadSource(), + "Read source should exist (handleReadEvent is internal to handler)") + + task.testTeardownDispatchSourcesForTesting() + } + + func testHandleWriteEventWiring() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testHasWriteSource(), + "Write source should exist (handleWriteEvent is internal to handler)") + + task.testTeardownDispatchSourcesForTesting() + } + + func testProcessReadMethodExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertTrue(task.responds(to: #selector(task.processRead)), + "PTYTask should have processRead method") + } + + func testProcessWriteMethodExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertTrue(task.responds(to: #selector(task.processWrite)), + "PTYTask should have processWrite method") + } +} + +// MARK: - Read Handler Pipeline Tests + +/// Tests for the read handler pipeline (data on fd -> read -> delegate) +final class DispatchSourceReadPipelineTests: XCTestCase { + + func testReadSourceTriggersThreadedReadTask() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + let mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + task.testSetFd(pipe.readFd) + task.paused = false + + let readExpectation = XCTestExpectation(description: "threadedReadTask called") + mockDelegate.onThreadedRead = { _ in + readExpectation.fulfill() + } + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should be resumed") + + let testMessage = "Hello from read handler test!" + let testData = testMessage.data(using: .utf8)! + let bytesWritten = testData.withUnsafeBytes { bufferPointer -> Int in + let rawPointer = bufferPointer.baseAddress! + return Darwin.write(pipe.writeFd, rawPointer, testData.count) + } + XCTAssertEqual(bytesWritten, testData.count, "Should write all bytes to pipe") + + wait(for: [readExpectation], timeout: 2.0) + + XCTAssertGreaterThan(mockDelegate.getReadCount(), 0, "threadedReadTask should be called") + + if let receivedData = mockDelegate.getLastReadData() { + let receivedString = String(data: receivedData, encoding: .utf8) + XCTAssertEqual(receivedString, testMessage, "Delegate should receive the written data") + } else { + XCTFail("Delegate should have received data") + } + + task.testTeardownDispatchSourcesForTesting() + } + + func testReadHandlerDoesNotBlock() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + let mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + task.testSetFd(pipe.readFd) + task.paused = false + + let readExpectation = XCTestExpectation(description: "Quick read") + mockDelegate.onThreadedRead = { _ in + readExpectation.fulfill() + } + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + let startTime = CFAbsoluteTimeGetCurrent() + + let testData = "Quick read test".data(using: .utf8)! + testData.withUnsafeBytes { bufferPointer in + let rawPointer = bufferPointer.baseAddress! + _ = Darwin.write(pipe.writeFd, rawPointer, testData.count) + } + + wait(for: [readExpectation], timeout: 2.0) + let elapsed = CFAbsoluteTimeGetCurrent() - startTime + + XCTAssertLessThan(elapsed, 0.5, "Read handler should complete quickly (not block)") + + task.testTeardownDispatchSourcesForTesting() + } + + func testMultipleReadsAccumulate() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + let mockDelegate = MockPTYTaskDelegate() + task.delegate = mockDelegate + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + var totalReceived = Data() + let lock = NSLock() + + let messages = ["First", "Second", "Third"] + let expectedBytes = messages.reduce(0) { $0 + $1.data(using: .utf8)!.count } + + mockDelegate.onThreadedRead = { data in + lock.lock() + totalReceived.append(data) + lock.unlock() + } + + for msg in messages { + let data = msg.data(using: .utf8)! + data.withUnsafeBytes { bufferPointer in + let rawPointer = bufferPointer.baseAddress! + _ = Darwin.write(pipe.writeFd, rawPointer, data.count) + } + } + + var allDataReceived = false + for _ in 0..<200 { + task.testWaitForIOQueue() + lock.lock() + let received = totalReceived.count >= expectedBytes + lock.unlock() + if received { + allDataReceived = true + break + } + } + XCTAssertTrue(allDataReceived, "All data should be received") + + lock.lock() + let receivedString = String(data: totalReceived, encoding: .utf8) ?? "" + lock.unlock() + + for msg in messages { + XCTAssertTrue(receivedString.contains(msg), "Should receive message: \(msg)") + } + + task.testTeardownDispatchSourcesForTesting() + } +} + +// MARK: - Write Path Tests + +/// Tests for write buffer and write source behavior +final class DispatchSourceWriteHandlerTests: XCTestCase { + + func testWriteBufferDidChangeWakesWriteSource() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testShouldWriteOverride = true + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should start suspended (empty buffer)") + XCTAssertFalse(task.testWriteBufferHasData(), "Write buffer should be empty initially") + + let testData = "Hello".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + + XCTAssertTrue(task.testWriteBufferHasData(), "Write buffer should have data after append") + XCTAssertTrue(task.wantsWrite, + "wantsWrite should be true with override and data in buffer") + + task.perform(NSSelectorFromString("writeBufferDidChange")) + task.testWaitForIOQueue() + + var bufferDrained = false + for _ in 0..<100 { + task.testWaitForIOQueue() + if !task.testWriteBufferHasData() { + bufferDrained = true + break + } + } + XCTAssertTrue(bufferDrained, "Write buffer should be drained after write source fires") + + task.testShouldWriteOverride = false + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceResumesWhenBufferFills() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testShouldWriteOverride = true + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testWriteBufferHasData(), "Buffer should be empty initially") + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should be suspended with empty buffer") + + let testData = "Test data for write source".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should have data after append") + XCTAssertTrue(task.wantsWrite, "wantsWrite should be true with override and data in buffer") + + task.perform(NSSelectorFromString("writeBufferDidChange")) + task.testWaitForIOQueue() + + var bufferDrained = false + for _ in 0..<100 { + task.testWaitForIOQueue() + if !task.testWriteBufferHasData() { + bufferDrained = true + break + } + } + XCTAssertTrue(bufferDrained, "Buffer should be drained after write source fires") + + task.testShouldWriteOverride = false + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceSuspendResumeCycleViaPause() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.testShouldWriteOverride = true + task.paused = true + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testWriteBufferHasData(), "Buffer should be empty initially") + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should be SUSPENDED when paused") + + let testData = "Data for resume test".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should have data after append") + + task.perform(NSSelectorFromString("writeBufferDidChange")) + task.testWaitForIOQueue() + + XCTAssertFalse(task.wantsWrite, "wantsWrite should be false when paused") + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should stay SUSPENDED when paused") + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should still have data (no write occurred)") + + // Unpause - write source should RESUME and drain buffer + task.paused = false + task.perform(NSSelectorFromString("updateWriteSourceState")) + task.testWaitForIOQueue() + + var bufferDrained = false + for _ in 0..<100 { + task.testWaitForIOQueue() + if !task.testWriteBufferHasData() { + bufferDrained = true + break + } + } + XCTAssertTrue(bufferDrained, "Buffer should be drained after unpause triggers write") + + task.testShouldWriteOverride = false + task.testTeardownDispatchSourcesForTesting() + } +} + +// MARK: - Write Path Round-Trip Tests + +/// Tests that verify data written via writeTask: actually appears on the fd. +final class DispatchSourceWritePathRoundTripTests: XCTestCase { + + func testWriteTaskDataAppearsOnFd() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + task.testSetupDispatchSourcesForTesting() + defer { task.testTeardownDispatchSourcesForTesting() } + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), "Write source should start suspended (empty buffer)") + + let testMessage = "Hello from keyboard!" + let testData = testMessage.data(using: .utf8)! + task.write(testData) + + var bufferDrained = false + for _ in 0..<100 { + task.testWaitForIOQueue() + if !task.testWriteBufferHasData() { + bufferDrained = true + break + } + } + XCTAssertTrue(bufferDrained, "Write buffer should be drained after write source fires") + + var readBuffer = [UInt8](repeating: 0, count: 256) + let bytesRead = Darwin.read(pipe.readFd, &readBuffer, readBuffer.count) + + XCTAssertGreaterThan(bytesRead, 0, "Should have read data from pipe") + if bytesRead > 0 { + let receivedData = Data(bytes: readBuffer, count: bytesRead) + let receivedString = String(data: receivedData, encoding: .utf8) + XCTAssertEqual(receivedString, testMessage, + "Data read from pipe should match what was written via writeTask:") + } + } + + func testWriteTaskWithoutIoAllowedDoesNotWrite() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: false) + + task.testSetupDispatchSourcesForTesting() + defer { task.testTeardownDispatchSourcesForTesting() } + task.testWaitForIOQueue() + + let testData = "Should not appear".data(using: .utf8)! + task.write(testData) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testWriteBufferHasData(), + "Buffer should retain data when ioAllowed is false") + + var readBuffer = [UInt8](repeating: 0, count: 256) + let bytesRead = Darwin.read(pipe.readFd, &readBuffer, readBuffer.count) + XCTAssertEqual(bytesRead, -1, "Pipe should have no data (EAGAIN expected)") + XCTAssertEqual(errno, EAGAIN, "Read should return EAGAIN on empty non-blocking pipe") + } + + func testMultipleWriteTaskCallsAccumulate() throws { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + task.testSetupDispatchSourcesForTesting() + defer { task.testTeardownDispatchSourcesForTesting() } + task.testWaitForIOQueue() + + task.write("Hello ".data(using: .utf8)!) + task.write("World".data(using: .utf8)!) + task.write("!".data(using: .utf8)!) + + var bufferDrained = false + for _ in 0..<100 { + task.testWaitForIOQueue() + if !task.testWriteBufferHasData() { + bufferDrained = true + break + } + } + XCTAssertTrue(bufferDrained, "All data should be written") + + var allData = Data() + var readBuffer = [UInt8](repeating: 0, count: 256) + while true { + let bytesRead = Darwin.read(pipe.readFd, &readBuffer, readBuffer.count) + if bytesRead <= 0 { break } + allData.append(contentsOf: readBuffer[0.. 0 { + let received = String(data: Data(bytes: readBuffer, count: bytesRead), encoding: .utf8) + XCTAssertEqual(received, testMessage, "Written data should match") + } + } +} diff --git a/ModernTests/DispatchSources/DispatchSourceStateTests.swift b/ModernTests/DispatchSources/DispatchSourceStateTests.swift new file mode 100644 index 0000000000..db279eda12 --- /dev/null +++ b/ModernTests/DispatchSources/DispatchSourceStateTests.swift @@ -0,0 +1,769 @@ +// +// DispatchSourceStateTests.swift +// ModernTests +// +// State predicate and pause/unpause transition tests for dispatch sources. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Read State Predicate Tests + +/// Tests for wantsRead predicate behavior +final class DispatchSourceReadStateTests: XCTestCase { + + func testWantsReadMethodExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertTrue(task.responds(to: #selector(getter: task.wantsRead)), + "PTYTask should have wantsRead property") + } + + func testWantsReadFalseWhenPaused() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertFalse(task.wantsRead, "wantsRead should return false when paused") + } + + func testWantsReadChangesWithPauseState() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertFalse(task.wantsRead, "wantsRead should be false when paused=true") + + task.paused = false + // wantsRead being true also requires ioAllowed + // We can only verify that pausing definitely makes it false + } + + func testUpdateReadSourceStateSafeWithoutSources() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertFalse(task.testHasReadSource(), "No read source before update") + + let selector = NSSelectorFromString("updateReadSourceState") + guard task.responds(to: selector) else { + XCTFail("PTYTask should respond to updateReadSourceState") + return + } + + for _ in 0..<3 { + task.perform(selector) + } + + XCTAssertFalse(task.testHasReadSource(), "updateReadSourceState should not create source") + } +} + +// MARK: - Write State Predicate Tests + +/// Tests for wantsWrite predicate behavior +final class DispatchSourceWriteStateTests: XCTestCase { + + func testWantsWriteMethodExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertTrue(task.responds(to: #selector(getter: task.wantsWrite)), + "PTYTask should have wantsWrite property") + } + + func testWantsWriteFalseWhenPaused() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertFalse(task.wantsWrite, "wantsWrite should return false when paused") + } + + func testWantsWriteFalseWhenBufferEmpty() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + XCTAssertFalse(task.wantsWrite, "wantsWrite should be false with empty buffer") + } + + func testShouldWriteOverrideProperty() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertFalse(task.testShouldWriteOverride, "Override should initially be false") + + task.testShouldWriteOverride = true + XCTAssertTrue(task.testShouldWriteOverride, "Override should be settable to true") + + task.testShouldWriteOverride = false + XCTAssertFalse(task.testShouldWriteOverride, "Override should be resettable to false") + + // Test that override affects wantsWrite with buffer data + task.testShouldWriteOverride = true + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + let testData = "Test data".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should have data after append") + XCTAssertTrue(task.wantsWrite, "wantsWrite should be true with override and data in buffer") + + task.testShouldWriteOverride = false + } + + func testUpdateWriteSourceStateSafeWithoutSources() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertFalse(task.testHasWriteSource(), "No write source before update") + + let selector = NSSelectorFromString("updateWriteSourceState") + guard task.responds(to: selector) else { + XCTFail("PTYTask should respond to updateWriteSourceState") + return + } + + for _ in 0..<3 { + task.perform(selector) + } + + XCTAssertFalse(task.testHasWriteSource(), "updateWriteSourceState should not create source") + } +} + +// MARK: - Pause State Tests + +/// Tests for pause state affecting dispatch source suspend/resume +final class DispatchSourcePauseStateTests: XCTestCase { + + func testPausedPropertyExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + let initialPaused = task.paused + task.paused = !initialPaused + XCTAssertEqual(task.paused, !initialPaused, "paused property should be settable") + task.paused = initialPaused + XCTAssertEqual(task.paused, initialPaused, "paused property should round-trip") + } + + func testPauseAffectsWantsRead() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertFalse(task.wantsRead, "wantsRead should be false when paused") + } + + func testPauseAffectsWantsWrite() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertFalse(task.wantsWrite, "wantsWrite should be false when paused") + } + + func testReadSourceSuspendsAndResumesWithPauseCycles() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = true + + // Use nonzero pid so the read source can be suspended (pid=0 + // keeps it always active since there's no proc source for EOF). + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + // Read source suspends when paused (proc source handles EOF detection) + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should be suspended when paused") + + task.paused = false + task.testWaitForIOQueue() + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should resume when unpaused") + + task.paused = true + task.testWaitForIOQueue() + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should suspend again when paused") + + task.testTeardownDispatchSourcesForTesting() + } + + func testReadSourceSuspendsOnPauseAndResumesOnUnpause() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should be active when not paused") + + task.paused = true + task.testWaitForIOQueue() + XCTAssertTrue(task.testIsReadSourceSuspended(), "Read source should suspend when paused") + + task.paused = false + task.testWaitForIOQueue() + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should resume when unpaused") + + task.testTeardownDispatchSourcesForTesting() + } + + func testReadSourceNeverReSuspendsAfterProcessExit() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + // Simulate process exit — read source should never re-suspend after this + task.testSimulateProcessExit() + task.testWaitForIOQueue() + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should be active after process exit") + + task.paused = true + task.testWaitForIOQueue() + XCTAssertFalse(task.testIsReadSourceSuspended(), + "Read source must not re-suspend after process exit, even when paused") + + task.testTeardownDispatchSourcesForTesting() + } + + /// Regression test: pid=0 tasks (tmux) must respect pause and backpressure + /// like any other task. There is no child process to exit — EOF arrives when + /// the tmux server closes the fd, and GCD queues that event for delivery + /// when the read source is next resumed. + func testReadSourceSuspendsNormallyWhenPidUnavailable() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.testIoAllowedOverride = NSNumber(value: true) + task.paused = false + + // pid=0: no proc source, no child process + task.testSetupDispatchSourcesForTesting() // defaults to pid=0 + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsReadSourceSuspended(), + "Read source should be active with pid=0 and not paused") + + // Pausing MUST suspend the read source even with pid=0 + task.paused = true + task.testWaitForIOQueue() + XCTAssertTrue(task.testIsReadSourceSuspended(), + "Read source must suspend when paused, even with pid=0") + + // Unpausing should resume + task.paused = false + task.testWaitForIOQueue() + XCTAssertFalse(task.testIsReadSourceSuspended(), + "Read source should resume when unpaused with pid=0") + + task.testTeardownDispatchSourcesForTesting() + } + + func testPauseUnpauseCycle() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + XCTAssertTrue(task.paused) + XCTAssertFalse(task.wantsRead, "wantsRead should be false when paused") + + task.paused = false + XCTAssertFalse(task.paused) + } + + func testRapidPauseUnpauseCycle() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + for _ in 0..<100 { + task.paused = true + task.paused = false + } + XCTAssertFalse(task.paused, "Should end in unpaused state") + } + + func testUpdateMethodsIdempotent() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + let readSelector = NSSelectorFromString("updateReadSourceState") + let writeSelector = NSSelectorFromString("updateWriteSourceState") + + guard task.responds(to: readSelector) && task.responds(to: writeSelector) else { + XCTFail("PTYTask should respond to update methods") + return + } + + let initialHasReadSource = task.testHasReadSource() + let initialHasWriteSource = task.testHasWriteSource() + + for _ in 0..<20 { + task.perform(readSelector) + task.perform(writeSelector) + } + + XCTAssertEqual(task.testHasReadSource(), initialHasReadSource, + "Read source state should remain stable") + XCTAssertEqual(task.testHasWriteSource(), initialHasWriteSource, + "Write source state should remain stable") + } +} + +// MARK: - ioAllowed Predicate Tests + +/// Tests for ioAllowed affecting wantsRead/wantsWrite predicates +final class DispatchSourceIoAllowedTests: XCTestCase { + + func testIoAllowedOverridePropertyExists() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + XCTAssertNil(task.testIoAllowedOverride, "testIoAllowedOverride should be nil by default") + + task.testIoAllowedOverride = NSNumber(value: true) + XCTAssertEqual(task.testIoAllowedOverride?.boolValue, true) + + task.testIoAllowedOverride = NSNumber(value: false) + XCTAssertEqual(task.testIoAllowedOverride?.boolValue, false) + + task.testIoAllowedOverride = nil + XCTAssertNil(task.testIoAllowedOverride) + } + + func testWantsReadFalseWhenIoAllowedFalse() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + task.testIoAllowedOverride = NSNumber(value: false) + + XCTAssertFalse(task.wantsRead, "wantsRead should be false when ioAllowed=false") + } + + func testWantsReadTrueWhenIoAllowedTrueAndNotPaused() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + XCTAssertTrue(task.wantsRead, "wantsRead should be true when ioAllowed=true and not paused") + } + + func testWantsReadFlipsWhenIoAllowedChanges() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + + task.testIoAllowedOverride = NSNumber(value: true) + XCTAssertTrue(task.wantsRead, "wantsRead should be true when ioAllowed=true") + + task.testIoAllowedOverride = NSNumber(value: false) + XCTAssertFalse(task.wantsRead, "wantsRead should flip to false when ioAllowed flips to false") + + task.testIoAllowedOverride = NSNumber(value: true) + XCTAssertTrue(task.wantsRead, "wantsRead should flip back to true") + } + + func testIoAllowedFalseOverridesPausedFalse() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + task.testIoAllowedOverride = NSNumber(value: false) + + XCTAssertFalse(task.wantsRead, "ioAllowed=false should make wantsRead=false regardless of paused state") + } + + func testIoAllowedTrueDoesNotOverridePausedTrue() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = true + task.testIoAllowedOverride = NSNumber(value: true) + + XCTAssertFalse(task.wantsRead, "paused=true should make wantsRead=false regardless of ioAllowed") + } + + func testReadSourceSuspendsWhenIoAllowedBecomesFalse() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsReadSourceSuspended(), "Read source should be active when ioAllowed=true") + + task.testIoAllowedOverride = NSNumber(value: false) + task.perform(NSSelectorFromString("updateReadSourceState")) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsReadSourceSuspended(), + "Read source should SUSPEND when ioAllowed becomes false") + + task.testTeardownDispatchSourcesForTesting() + } + + func testReadSourceResumesWhenIoAllowedBecomesTrue() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: false) + + task.testSetupDispatchSourcesForTesting(withPid: getpid()) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsReadSourceSuspended(), + "Read source should be suspended when ioAllowed=false") + + task.testIoAllowedOverride = NSNumber(value: true) + task.perform(NSSelectorFromString("updateReadSourceState")) + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsReadSourceSuspended(), + "Read source should RESUME when ioAllowed becomes true") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceSuspendsWhenIoAllowedBecomesFalse() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + let testData = "Test data for write".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsWriteSourceSuspended(), + "Write source should be resumed with ioAllowed=true and data in buffer") + + task.testIoAllowedOverride = NSNumber(value: false) + task.perform(NSSelectorFromString("updateWriteSourceState")) + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), + "Write source should SUSPEND when ioAllowed becomes false") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceResumesWhenIoAllowedBecomesTrue() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: false) + let testData = "Test data for write".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), + "Write source should be suspended with ioAllowed=false") + + task.testIoAllowedOverride = NSNumber(value: true) + task.perform(NSSelectorFromString("updateWriteSourceState")) + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsWriteSourceSuspended(), + "Write source should RESUME when ioAllowed becomes true") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceStaysSuspendedWithoutData() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + task.testShouldWriteOverride = true + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), + "Write source should stay suspended without data in buffer") + + let testData = "Test data".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + task.perform(NSSelectorFromString("updateWriteSourceState")) + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsWriteSourceSuspended(), + "Write source should resume after data is added to buffer") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWriteSourceSuspendsWhenPausedBecomesTrue() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.writeFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + task.testShouldWriteOverride = true + let testData = "Test data".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + XCTAssertFalse(task.testIsWriteSourceSuspended(), + "Write source should be resumed when unpaused with data") + + task.paused = true + task.testWaitForIOQueue() + + XCTAssertTrue(task.testIsWriteSourceSuspended(), + "Write source should SUSPEND when paused becomes true") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWantsWriteFalseWhenIoAllowedFalse() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + let testData = "Test data".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should have data") + + task.testIoAllowedOverride = NSNumber(value: false) + XCTAssertFalse(task.wantsWrite, "wantsWrite should be false when ioAllowed=false") + } + + /// Regression test: In legacy TaskNotifier mode (no ioHandler), wantsRead + /// must NOT check backpressure because there is no backpressureReleaseHandler + /// wired to wake reads. Suppressing reads without a wake-up path stalls output. + func testWantsReadIgnoresBackpressureInLegacyMode() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + // Assign a mock that reports heavy backpressure + let mock = MockBackpressureSource() + mock.backpressureLevel = .heavy + task.tokenExecutor = mock + + // Without ioHandler (legacy mode), backpressure must be ignored + XCTAssertTrue(task.wantsRead, + "wantsRead must ignore backpressure in legacy TaskNotifier mode") + } + + /// Verify that per-PTY mode DOES suppress reads under heavy backpressure. + func testWantsReadRespectsBackpressureInPerPTYMode() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + guard let pipe = createTestPipe() else { + XCTFail("Failed to create test pipe") + return + } + defer { closeTestPipe(pipe) } + + task.testSetFd(pipe.readFd) + task.paused = false + task.testIoAllowedOverride = NSNumber(value: true) + + // Set up ioHandler (per-PTY mode) + task.testSetupDispatchSourcesForTesting() + task.testWaitForIOQueue() + + let mock = MockBackpressureSource() + mock.backpressureLevel = .heavy + task.tokenExecutor = mock + + XCTAssertFalse(task.wantsRead, + "wantsRead should suppress reads under heavy backpressure in per-PTY mode") + + mock.backpressureLevel = .none + XCTAssertTrue(task.wantsRead, + "wantsRead should allow reads when backpressure drops in per-PTY mode") + + task.testTeardownDispatchSourcesForTesting() + } + + func testWantsWriteTrueWhenAllConditionsMet() { + guard let task = PTYTask() else { + XCTFail("Failed to create PTYTask") + return + } + + task.paused = false + task.testShouldWriteOverride = true + + let testData = "Test data".data(using: .utf8)! + task.testAppendData(toWriteBuffer: testData) + XCTAssertTrue(task.testWriteBufferHasData(), "Buffer should have data") + + task.testIoAllowedOverride = NSNumber(value: true) + XCTAssertTrue(task.wantsWrite, "wantsWrite should be true when all conditions met") + + task.testShouldWriteOverride = false + } +} diff --git a/ModernTests/DispatchSources/MockJobManager.swift b/ModernTests/DispatchSources/MockJobManager.swift new file mode 100644 index 0000000000..93c80bcaca --- /dev/null +++ b/ModernTests/DispatchSources/MockJobManager.swift @@ -0,0 +1,87 @@ +// +// MockJobManager.swift +// ModernTests +// +// Minimal mock implementing iTermJobManager for testing PTYTask code paths +// that require a job manager (e.g., closeFileDescriptorAndDeregisterIfPossible). +// + +import Foundation +@testable import iTerm2SharedARC + +/// A minimal mock job manager for dispatch source tests. +/// +/// Only `closeFileDescriptor` and the required properties are implemented. +/// All other protocol methods are no-ops or stubs. +final class MockJobManager: NSObject, iTermJobManager { + + // MARK: - Test hooks + + /// Value returned by `closeFileDescriptor`. Defaults to true. + var closeFileDescriptorReturnValue: Bool = true + + /// Number of times `closeFileDescriptor` was called. + private(set) var closeFileDescriptorCallCount = 0 + + // MARK: - iTermJobManager required properties + + var fd: Int32 = -1 + var tty: String? = nil + + var externallyVisiblePid: pid_t { return 0 } + var hasJob: Bool { return false } + var sessionRestorationIdentifier: Any! { return nil } + var pidToWaitOn: pid_t { return 0 } + var isSessionRestorationPossible: Bool { return false } + var ioAllowed: Bool { return true } + var queue: dispatch_queue_t! { return DispatchQueue.main } + var isReadOnly: Bool { return false } + + // MARK: - iTermJobManager required class method + + static func available() -> Bool { return true } + + // MARK: - iTermJobManager required initializer + + required init!(queue: dispatch_queue_t!) { + super.init() + } + + override init() { + super.init() + } + + // MARK: - iTermJobManager required methods + + func forkAndExec(with ttyState: iTermTTYState, + argpath: String!, + argv: [String]!, + initialPwd: String!, + newEnviron: [String]!, + task: any iTermTask, + completion: ((iTermJobManagerForkAndExecStatus, NSNumber?) -> Void)!) { + // No-op for tests + } + + func attach(toServer serverConnection: iTermGeneralServerConnection, + withProcessID thePid: NSNumber!, + task: any iTermTask, + completion: ((iTermJobManagerAttachResults) -> Void)!) { + // No-op for tests + } + + func attach(toServer serverConnection: iTermGeneralServerConnection, + withProcessID thePid: NSNumber!, + task: any iTermTask) -> iTermJobManagerAttachResults { + return [] + } + + func kill(with mode: iTermJobManagerKillingMode) { + // No-op for tests + } + + func closeFileDescriptor() -> Bool { + closeFileDescriptorCallCount += 1 + return closeFileDescriptorReturnValue + } +} diff --git a/ModernTests/DispatchSources/MockPTYTaskDelegate.swift b/ModernTests/DispatchSources/MockPTYTaskDelegate.swift new file mode 100644 index 0000000000..5f7c5e2ef9 --- /dev/null +++ b/ModernTests/DispatchSources/MockPTYTaskDelegate.swift @@ -0,0 +1,158 @@ +// +// MockPTYTaskDelegate.swift +// ModernTests +// +// Mock implementation of PTYTaskDelegate for testing dispatch source integration. +// Provides call tracking and callbacks for verifying read handler pipeline. +// + +import Foundation +@testable import iTerm2SharedARC + +/// Mock delegate for testing the read handler pipeline in PTYTask. +final class MockPTYTaskDelegate: NSObject, PTYTaskDelegate { + + // MARK: - Configuration + + /// Callback invoked when threadedReadTask is called. + /// Use this to fulfill expectations or capture data in tests. + var onThreadedRead: ((Data) -> Void)? + + /// Callback invoked when taskDidRegister is called. + /// Use this to wire up dependencies during registration. + var onTaskDidRegister: ((PTYTask) -> Void)? + + // MARK: - Call Tracking + + private let lock = NSLock() + private var _readCallCount = 0 + private var _lastReadData: Data? + private var _readDataChunks: [Data] = [] + private var _brokenPipeCount = 0 + private var _threadedBrokenPipeCount = 0 + + var readCallCount: Int { + lock.lock() + defer { lock.unlock() } + return _readCallCount + } + + var lastReadData: Data? { + lock.lock() + defer { lock.unlock() } + return _lastReadData + } + + /// All data chunks received via threadedReadTask, in order. + var readDataChunks: [Data] { + lock.lock() + defer { lock.unlock() } + return _readDataChunks + } + + var brokenPipeCount: Int { + lock.lock() + defer { lock.unlock() } + return _brokenPipeCount + } + + var threadedBrokenPipeCount: Int { + lock.lock() + defer { lock.unlock() } + return _threadedBrokenPipeCount + } + + // MARK: - Convenience + + var threadedBrokenPipeCalled: Bool { + return threadedBrokenPipeCount > 0 + } + + var readData: [Data] { + return readDataChunks + } + + // MARK: - PTYTaskDelegate + + func threadedReadTask(_ buffer: UnsafeMutablePointer, length: Int32) { + lock.lock() + _readCallCount += 1 + let data = Data(bytes: buffer, count: Int(length)) + _lastReadData = data + _readDataChunks.append(data) + lock.unlock() + + onThreadedRead?(data) + } + + func threadedTaskBrokenPipe() { + lock.lock() + _threadedBrokenPipeCount += 1 + lock.unlock() + } + + func brokenPipe() { + lock.lock() + _brokenPipeCount += 1 + lock.unlock() + } + + func tmuxClientWrite(_ data: Data) { + // Not used in these tests + } + + func taskDiedImmediately() { + // Not used in these tests + } + + func taskDiedWithError(_ error: String!) { + // Not used in these tests + } + + func taskDidChangeTTY(_ task: PTYTask) { + // Not used in these tests + } + + func taskDidRegister(_ task: PTYTask) { + onTaskDidRegister?(task) + } + + func taskDidChangePaused(_ task: PTYTask, paused: Bool) { + // Not used in these tests + } + + func taskMuteCoprocessDidChange(_ task: PTYTask, hasMuteCoprocess: Bool) { + // Not used in these tests + } + + func taskDidResize(to gridSize: VT100GridSize, pixelSize: NSSize) { + // Not used in these tests + } + + func taskDidReadFromCoprocessWhileSSHIntegration(inUse data: Data) { + // Not used in these tests + } + + // MARK: - Test Helpers + + func reset() { + lock.lock() + _readCallCount = 0 + _lastReadData = nil + _readDataChunks = [] + _brokenPipeCount = 0 + _threadedBrokenPipeCount = 0 + onThreadedRead = nil + lock.unlock() + } + + /// Thread-safe accessor for read count + func getReadCount() -> Int { + return readCallCount + } + + /// Thread-safe accessor for last read data + func getLastReadData() -> Data? { + return lastReadData + } +} diff --git a/ModernTests/DispatchSources/TestHelpers.swift b/ModernTests/DispatchSources/TestHelpers.swift new file mode 100644 index 0000000000..124d441ce2 --- /dev/null +++ b/ModernTests/DispatchSources/TestHelpers.swift @@ -0,0 +1,89 @@ +// +// TestHelpers.swift +// ModernTests +// +// Shared test utilities for dispatch source tests. +// Provides pipe creation, queue synchronization, and condition waiting. +// + +import XCTest +@testable import iTerm2SharedARC + +// MARK: - Pipe Utilities + +/// Creates a non-blocking pipe for testing I/O operations. +/// - Returns: Tuple with read and write file descriptors, or nil on failure +func createTestPipe() -> (readFd: Int32, writeFd: Int32)? { + var fds: [Int32] = [0, 0] + guard pipe(&fds) == 0 else { return nil } + + // Set non-blocking on both ends + let readFlags = fcntl(fds[0], F_GETFL) + let writeFlags = fcntl(fds[1], F_GETFL) + _ = fcntl(fds[0], F_SETFL, readFlags | O_NONBLOCK) + _ = fcntl(fds[1], F_SETFL, writeFlags | O_NONBLOCK) + + return (fds[0], fds[1]) +} + +/// Closes both ends of a test pipe. +func closeTestPipe(_ fds: (readFd: Int32, writeFd: Int32)) { + close(fds.readFd) + close(fds.writeFd) +} + +/// Writes data to a file descriptor. +/// - Parameters: +/// - fd: File descriptor to write to +/// - data: String data to write +/// - Returns: Number of bytes written, or -1 on error +@discardableResult +func writeToFd(_ fd: Int32, data: String) -> Int { + return data.withCString { ptr in + Darwin.write(fd, ptr, strlen(ptr)) + } +} + +// MARK: - Queue Synchronization + +/// Waits for main queue to process all pending work. +func waitForMainQueue() { + if Thread.isMainThread { + // Already on main, run a spin through the run loop + RunLoop.current.run(until: Date()) + } else { + DispatchQueue.main.sync {} + } +} + +// MARK: - Mock Backpressure Source + +/// Minimal mock for PTYTask.tokenExecutor that exposes a settable backpressureLevel. +/// Used to test that wantsRead only checks backpressure in per-PTY mode. +final class MockBackpressureSource: NSObject { + @objc var backpressureLevel: BackpressureLevel = .none +} + +// MARK: - XCTestCase Extensions + +extension XCTestCase { + + /// Waits for a condition to become true, polling at intervals. + /// - Parameters: + /// - condition: Closure that returns true when condition is met + /// - timeout: Maximum time to wait + /// - pollInterval: Time between checks + /// - Returns: true if condition became true within timeout + func waitForCondition(_ condition: @escaping () -> Bool, + timeout: TimeInterval, + pollInterval: TimeInterval = 0.01) -> Bool { + let deadline = Date().addingTimeInterval(timeout) + while Date() < deadline { + if condition() { + return true + } + Thread.sleep(forTimeInterval: pollInterval) + } + return condition() + } +} diff --git a/iTerm2.xcodeproj/project.pbxproj b/iTerm2.xcodeproj/project.pbxproj index b3890d10ee..fdd4de5571 100644 --- a/iTerm2.xcodeproj/project.pbxproj +++ b/iTerm2.xcodeproj/project.pbxproj @@ -1223,6 +1223,7 @@ A616839A22F94AEE00661F71 /* GPBEnumArray+iTerm.m in Sources */ = {isa = PBXBuildFile; fileRef = A616839822F94AEE00661F71 /* GPBEnumArray+iTerm.m */; }; A616CF0D284143A300EF82D0 /* ConductorRecovery.swift in Sources */ = {isa = PBXBuildFile; fileRef = A616CF0C284143A300EF82D0 /* ConductorRecovery.swift */; }; A616CF112841AA1600EF82D0 /* WinSizeController.swift in Sources */ = {isa = PBXBuildFile; fileRef = A616CF102841AA1600EF82D0 /* WinSizeController.swift */; }; + 337071A3E6EEB42EF211A35D /* PTYTaskIOHandler.swift in Sources */ = {isa = PBXBuildFile; fileRef = BFE6B6E4B9840001204A07E1 /* PTYTaskIOHandler.swift */; }; A616D78B2EA7F167000C0C18 /* CyclicLog.swift in Sources */ = {isa = PBXBuildFile; fileRef = A616D78A2EA7F167000C0C18 /* CyclicLog.swift */; }; A616D78C2EA82E71000C0C18 /* iTermTipWindowController.m in Sources */ = {isa = PBXBuildFile; fileRef = 1DA76A041B30892900CB272A /* iTermTipWindowController.m */; }; A616D78E2EA85D9E000C0C18 /* AdapterPasswordDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = A616D78D2EA85D9E000C0C18 /* AdapterPasswordDataSource.swift */; }; @@ -8846,6 +8847,7 @@ A6EEAEDF28385A090039E850 /* PTYTask+ProcessInfo.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "PTYTask+ProcessInfo.h"; sourceTree = ""; }; A6EEAEE028385A090039E850 /* PTYTask+ProcessInfo.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = "PTYTask+ProcessInfo.m"; sourceTree = ""; }; A6EEAEE328385A3F0039E850 /* PTYTask+Private.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "PTYTask+Private.h"; sourceTree = ""; }; + BFE6B6E4B9840001204A07E1 /* PTYTaskIOHandler.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PTYTaskIOHandler.swift; sourceTree = ""; }; A6EED4612A804A2E006F0DB4 /* InputSourceForcer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = InputSourceForcer.swift; sourceTree = ""; }; A6EF57CF21994DDC00C76698 /* iTermUserDefaultsObserver.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = iTermUserDefaultsObserver.h; sourceTree = ""; }; A6EF57D021994DDC00C76698 /* iTermUserDefaultsObserver.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = iTermUserDefaultsObserver.m; sourceTree = ""; }; @@ -11683,6 +11685,7 @@ A6EEAEDF28385A090039E850 /* PTYTask+ProcessInfo.h */, A6EEAEE028385A090039E850 /* PTYTask+ProcessInfo.m */, A6EEAEE328385A3F0039E850 /* PTYTask+Private.h */, + BFE6B6E4B9840001204A07E1 /* PTYTaskIOHandler.swift */, A616CF0C284143A300EF82D0 /* ConductorRecovery.swift */, A616CF102841AA1600EF82D0 /* WinSizeController.swift */, A6D1C42128619E4800B68526 /* iTermTextDrawingHelperDelegate.h */, @@ -18912,6 +18915,7 @@ 5370678D21C9D2780088D0F3 /* SIGArchiveVerifier.m in Sources */, A61D2A4927555CD000E539F3 /* MenuItemPopupView.swift in Sources */, A616CF112841AA1600EF82D0 /* WinSizeController.swift in Sources */, + 337071A3E6EEB42EF211A35D /* PTYTaskIOHandler.swift in Sources */, A67960D21F81FCBB008A42BC /* iTermBadgeRenderer.m in Sources */, A653F6A924D00FFE0062377E /* iTermRestorableStateSQLite.m in Sources */, A6E112D725C615B400875D27 /* iTermHighlightLineTrigger.m in Sources */, diff --git a/sources/PTYSession.m b/sources/PTYSession.m index d017e176aa..6979235917 100644 --- a/sources/PTYSession.m +++ b/sources/PTYSession.m @@ -4039,6 +4039,19 @@ - (void)taskDidChangeTTY:(PTYTask *)task { // Main thread - (void)taskDidRegister:(PTYTask *)task { [self updateTTYSize]; + + // Wire up backpressure integration for dispatch sources path. + // Must happen before PTYTask starts its dispatch sources so that + // shouldRead can apply backpressure from the first read. + if ([iTermAdvancedSettingsModel usePerPTYDispatchSources]) { + iTermTokenExecutor *executor = _screen.mutableState.tokenExecutor; + task.tokenExecutor = executor; + + __weak PTYTask *weakTask = task; + executor.backpressureReleaseHandler = ^{ + [weakTask updateReadSourceState]; + }; + } } - (void)tmuxDidDisconnect { diff --git a/sources/PTYTask+Private.h b/sources/PTYTask+Private.h index 8a2fcc3eb0..7224a5adc6 100644 --- a/sources/PTYTask+Private.h +++ b/sources/PTYTask+Private.h @@ -17,4 +17,6 @@ @property(atomic, weak) iTermLoggingHelper *loggingHelper; @property(atomic, strong) id jobManager; +- (void)closeFileDescriptorAndDeregisterIfPossible; + @end diff --git a/sources/PTYTask.h b/sources/PTYTask.h index 1137f075a2..8ec7578ad1 100644 --- a/sources/PTYTask.h +++ b/sources/PTYTask.h @@ -17,6 +17,7 @@ @class iTermWinSizeController; @class PTYTab; @class PTYTask; +@class PTYTaskIOHandler; @protocol PTYTaskDelegate // Runs in a background thread. Should do as much work as possible in this @@ -187,6 +188,13 @@ typedef NS_OPTIONS(NSUInteger, iTermJobManagerAttachResults) { // This is used by channels. It takes care of handling IO and this is the one strong reference to the ioBuffer. @property(nonatomic, strong) iTermIOBuffer *ioBuffer; +// Typed as `id` to avoid requiring Swift header import in this header. +// Set by PTYSession in taskDidRegister: for backpressure integration. +@property(nonatomic, weak) id tokenExecutor; + +// Any queue. Captures shouldRead snapshot, then dispatches to ioQueue for source suspend/resume. +- (void)updateReadSourceState; + + (NSMutableDictionary *)mutableEnvironmentDictionary; - (instancetype)init; @@ -239,6 +247,13 @@ typedef NS_OPTIONS(NSUInteger, iTermJobManagerAttachResults) { - (void)killWithMode:(iTermJobManagerKillingMode)mode; +// Register or deregister a task. When usePerPTYDispatchSources is true, +// registration triggers didRegister (which creates the PTYTaskIOHandler). +// Otherwise, registration goes through TaskNotifier for the select() loop. +// Job managers should call these instead of TaskNotifier directly. ++ (void)registerTaskWithNotifier:(id)task; ++ (void)deregisterTaskFromNotifier:(id)task; + - (void)registerTmuxTask; - (void)getWorkingDirectoryWithCompletion:(void (^)(NSString *pwd))completion; @@ -251,3 +266,27 @@ typedef NS_OPTIONS(NSUInteger, iTermJobManagerAttachResults) { queue:(dispatch_queue_t)queue; @end + +@interface PTYTask (Testing) +- (BOOL)testHasReadSource; +- (BOOL)testHasWriteSource; +- (BOOL)testIsReadSourceSuspended; +- (BOOL)testIsWriteSourceSuspended; +- (BOOL)testWriteBufferHasData; +- (void)testSetFd:(int)fd; +- (void)testSetupDispatchSourcesForTesting; +- (void)testSetupDispatchSourcesForTestingWithPid:(pid_t)pid; +- (void)testTeardownDispatchSourcesForTesting; +- (void)testAppendDataToWriteBuffer:(NSData *)data; +- (void)testWaitForIOQueue; +- (void)testSetJobManager:(id)jobManager; +- (BOOL)testHasCoprocessReadSource; +- (BOOL)testHasCoprocessWriteSource; +- (BOOL)testIsCoprocessReadSourceSuspended; +- (BOOL)testIsCoprocessWriteSourceSuspended; +- (void)testSetupCoprocessSourcesWithReadFd:(int)readFd writeFd:(int)writeFd; +- (void)testTeardownCoprocessSources; +- (void)testSimulateProcessExit; +@property (nonatomic) BOOL testShouldWriteOverride; +@property (nonatomic, strong) NSNumber *testIoAllowedOverride; +@end diff --git a/sources/PTYTask.m b/sources/PTYTask.m index 0a1719c39d..36a0b32b06 100644 --- a/sources/PTYTask.m +++ b/sources/PTYTask.m @@ -44,6 +44,9 @@ #include #include +@interface PTYTask() +@end + @interface PTYTask(WinSizeControllerDelegate) @end @@ -72,6 +75,9 @@ @implementation PTYTask { dispatch_queue_t _jobManagerQueue; BOOL _isTmuxTask; + PTYTaskIOHandler *_ioHandler; // Per-PTY dispatch source handler. nil when usePerPTYDispatchSources is off. + BOOL _testShouldWriteOverride; // Test hook to override shouldWrite. Any queue. + NSNumber *_testIoAllowedOverride; // Test hook to override ioAllowed. Any queue. } - (instancetype)init { @@ -109,6 +115,7 @@ - (void)dealloc { } [self.ioBuffer invalidate]; + [_ioHandler teardown]; [self closeFileDescriptorAndDeregisterIfPossible]; @synchronized (self) { @@ -138,8 +145,14 @@ - (void)setPaused:(BOOL)paused { @synchronized(self) { _paused = paused; } - // Start/stop selecting on our FD - [[TaskNotifier sharedInstance] unblock]; + // In per-PTY mode the ioHandler manages source state directly; + // in legacy mode, unblock wakes select() to re-check wantsRead/wantsWrite. + if (_ioHandler) { + [_ioHandler updateReadSourceState]; + [_ioHandler updateWriteSourceState]; + } else { + [[TaskNotifier sharedInstance] unblock]; + } [_delegate taskDidChangePaused:self paused:paused]; } @@ -215,6 +228,14 @@ - (void)setCoprocess:(Coprocess *)coprocess { [weakSelf.delegate taskMuteCoprocessDidChange:self hasMuteCoprocess:self.hasMuteCoprocess]; }); [[TaskNotifier sharedInstance] unblock]; + if (_ioHandler) { + if (coprocess) { + [_ioHandler setupCoprocessSourcesWithReadFd:coprocess.readFileDescriptor + writeFd:coprocess.writeFileDescriptor]; + } else { + [_ioHandler teardownCoprocessSources]; + } + } } - (BOOL)writeBufferHasRoom { @@ -314,7 +335,7 @@ - (void)setReadOnlyFileDescriptor:(int)readOnlyFileDescriptor { jobManager.fd = readOnlyFileDescriptor; DLog(@"Configure %@ as tmux task", self); _jobManager = jobManager; - [[TaskNotifier sharedInstance] registerTask:self]; + [PTYTask registerTaskWithNotifier:self]; } - (void)setIoBuffer:(iTermIOBuffer *)ioBuffer { @@ -368,8 +389,12 @@ - (void)writeTask:(NSData *)data coprocess:(BOOL)fromCoprocessOutput { assert(!jobManager || !self.jobManager.isReadOnly); [writeLock lock]; [writeBuffer appendData:data]; - [[TaskNotifier sharedInstance] unblock]; [writeLock unlock]; + if (_ioHandler) { + [_ioHandler writeBufferDidChange]; + } else { + [[TaskNotifier sharedInstance] unblock]; + } } - (void)killWithMode:(iTermJobManagerKillingMode)mode { @@ -397,14 +422,25 @@ - (void)brokenPipe { @synchronized(self) { brokenPipe_ = YES; } - [[TaskNotifier sharedInstance] deregisterTask:self]; + [_ioHandler teardown]; + [PTYTask deregisterTaskFromNotifier:self]; [self.delegate threadedTaskBrokenPipe]; } -// Main queue +// Main queue. May run after the task has been stopped (registration is +// dispatched async), so guard against a closed fd. - (void)didRegister { DLog(@"didRegister %@", self); [self.delegate taskDidRegister:self]; + if ([iTermAdvancedSettingsModel usePerPTYDispatchSources]) { + if (self.fd < 0) { + DLog(@"didRegister: fd already closed, skipping ioHandler setup"); + return; + } + _ioHandler = [[PTYTaskIOHandler alloc] initWithFd:self.fd pid:self.pid]; + _ioHandler.delegate = self; + [_ioHandler start]; + } } // I did extensive benchmarking in May of 2025 when using the VT100_GANG optimization fully. @@ -479,6 +515,7 @@ - (void)processWrite { } - (void)stopCoprocess { + [_ioHandler teardownCoprocessSources]; pid_t thePid = 0; @synchronized (self) { if (coprocess_.pid > 0) { @@ -630,7 +667,7 @@ - (iTermJobManagerAttachResults)finishAttachingToMultiserver:(id= BackpressureLevelHeavy) { + return NO; + } + } + return YES; } - (BOOL)wantsWrite { if (self.paused) { return NO; } - if (self.jobManager.isReadOnly) { + if (!_testShouldWriteOverride && self.jobManager.isReadOnly) { return NO; } - [writeLock lock]; - const BOOL wantsWrite = [writeBuffer length] > 0; - [writeLock unlock]; - if (!wantsWrite) { + if (![self effectiveIoAllowed]) { return NO; } - return self.jobManager.ioAllowed; + [writeLock lock]; + BOOL hasData = [writeBuffer length] > 0; + [writeLock unlock]; + return hasData; } - (BOOL)hasOutput { @@ -880,6 +937,7 @@ - (void)writeToCoprocess:(NSData *)data { @synchronized (self) { [coprocess_.outputBuffer appendData:data]; } + [_ioHandler updateCoprocessWriteSourceState]; } // The bytes in data were just read from the fd. @@ -902,10 +960,13 @@ - (void)readTask:(char *)buffer length:(int)length { - (void)closeFileDescriptorAndDeregisterIfPossible { assert(self.jobManager); + // Tear down dispatch sources before closing the fd. If the fd is closed + // first, the sources remain active on a potentially reused descriptor. + [_ioHandler teardown]; const int fd = self.fd; if ([self.jobManager closeFileDescriptor]) { DLog(@"Deregister file descriptor %d for process %@ after closing it", fd, @(self.pid)); - [[TaskNotifier sharedInstance] deregisterTask:self]; + [PTYTask deregisterTaskFromNotifier:self]; } } @@ -920,6 +981,159 @@ - (void)loggingHelperStop:(iTermLoggingHelper *)loggingHelper { self.loggingHelper = nil; } +#pragma mark - Registration Routing + ++ (void)registerTaskWithNotifier:(id)task { + if ([iTermAdvancedSettingsModel usePerPTYDispatchSources]) { + __weak __typeof(task) weakTask = task; + dispatch_async(dispatch_get_main_queue(), ^{ + [weakTask didRegister]; + }); + } else { + [[TaskNotifier sharedInstance] registerTask:task]; + } +} + ++ (void)deregisterTaskFromNotifier:(id)task { + if ([iTermAdvancedSettingsModel usePerPTYDispatchSources]) { + pid_t pidToWaitOn = task.pidToWaitOn; + if (pidToWaitOn > 0) { + [[TaskNotifier sharedInstance] waitForPid:pidToWaitOn]; + } + if ([task hasCoprocess]) { + [[TaskNotifier sharedInstance] waitForPid:[[task coprocess] pid]]; + } + } else { + [[TaskNotifier sharedInstance] deregisterTask:task]; + } +} + +#pragma mark - Dispatch Source Forwarding + +- (void)updateReadSourceState { + [_ioHandler updateReadSourceState]; +} + +- (void)updateWriteSourceState { + [_ioHandler updateWriteSourceState]; +} + +- (void)writeBufferDidChange { + [_ioHandler writeBufferDidChange]; +} + +- (void)teardownDispatchSources { + [_ioHandler teardown]; +} + +#pragma mark - PTYTaskIOHandlerDelegate + +- (BOOL)ioHandlerShouldRead:(PTYTaskIOHandler *)handler { + return [self wantsRead]; +} + +- (BOOL)ioHandlerShouldWrite:(PTYTaskIOHandler *)handler { + return [self wantsWrite]; +} + +- (BOOL)ioHandlerShouldResumeCoprocessRead:(PTYTaskIOHandler *)handler { + @synchronized (self) { + return coprocess_ && ![coprocess_ eof] && [self writeBufferHasRoom]; + } +} + +- (BOOL)ioHandlerShouldResumeCoprocessWrite:(PTYTaskIOHandler *)handler { + @synchronized (self) { + return coprocess_ && [coprocess_ wantToWrite]; + } +} + +- (void)ioHandler:(PTYTaskIOHandler *)handler didReadData:(const char *)buffer length:(int)length { + hasOutput = YES; + [self readTask:(char *)buffer length:length]; +} + +- (void)ioHandlerDidDetectBrokenPipe:(PTYTaskIOHandler *)handler { + [self brokenPipe]; +} + +- (void)ioHandlerDrainWriteBuffer:(PTYTaskIOHandler *)handler { + [self processWrite]; +} + +- (void)ioHandlerHandleCoprocessRead:(PTYTaskIOHandler *)handler { + Coprocess *coprocess; + @synchronized (self) { + coprocess = coprocess_; + } + if (!coprocess || [coprocess eof]) { + return; + } + + [coprocess read]; + + if ([coprocess eof]) { + [self handleCoprocessEOF]; + return; + } + + NSData *data = [coprocess.inputBuffer copy]; + [coprocess.inputBuffer setLength:0]; + if (data.length > 0) { + [self writeTask:data coprocess:YES]; + } +} + +- (void)ioHandlerHandleCoprocessWrite:(PTYTaskIOHandler *)handler { + Coprocess *coprocess; + @synchronized (self) { + coprocess = coprocess_; + } + if (!coprocess || [coprocess eof]) { + return; + } + + @synchronized (self) { + [coprocess write]; + } +} + +// Called on ioQueue when the coprocess read source detects EOF. +// Similar to stopCoprocess but differs in two ways: +// 1. Bounces the delegate call to the main queue (we're on ioQueue here, +// whereas stopCoprocess runs on the main thread and calls synchronously). +// 2. Uses weak self for the async dispatch since the task may be deallocated +// between dispatch and execution. +- (void)handleCoprocessEOF { + Coprocess *coprocess; + @synchronized (self) { + coprocess = coprocess_; + } + if (!coprocess) { + return; + } + + pid_t pid = coprocess.pid; + [coprocess terminate]; + + @synchronized (self) { + coprocess_ = nil; + self.hasMuteCoprocess = NO; + } + + [_ioHandler teardownCoprocessSources]; + + if (pid > 0) { + [[TaskNotifier sharedInstance] waitForPid:pid]; + } + + __weak typeof(self) weakSelf = self; + dispatch_async(dispatch_get_main_queue(), ^{ + [weakSelf.delegate taskMuteCoprocessDidChange:weakSelf hasMuteCoprocess:NO]; + [[TaskNotifier sharedInstance] notifyCoprocessChange]; + }); +} + @end @implementation PTYTask(WinSizeControllerDelegate) @@ -942,3 +1156,112 @@ - (void)winSizeControllerSetGridSize:(VT100GridSize)gridSize } @end + +#pragma mark - Testing + +@implementation PTYTask (Testing) + +- (BOOL)testHasReadSource { + return _ioHandler.testHasReadSource; +} + +- (BOOL)testHasWriteSource { + return _ioHandler.testHasWriteSource; +} + +- (BOOL)testIsReadSourceSuspended { + return _ioHandler.testIsReadSourceSuspended; +} + +- (BOOL)testIsWriteSourceSuspended { + return _ioHandler.testIsWriteSourceSuspended; +} + +- (BOOL)testWriteBufferHasData { + [writeLock lock]; + BOOL hasData = [writeBuffer length] > 0; + [writeLock unlock]; + return hasData; +} + +- (void)testSetFd:(int)fd { + if (![self.jobManager isKindOfClass:[iTermLegacyJobManager class]]) { + self.jobManager = [[iTermLegacyJobManager alloc] initWithQueue:_jobManagerQueue]; + } + self.jobManager.fd = fd; +} + +- (void)testSetupDispatchSourcesForTesting { + [self testSetupDispatchSourcesForTestingWithPid:0]; +} + +- (void)testSetupDispatchSourcesForTestingWithPid:(pid_t)pid { + _ioHandler = [[PTYTaskIOHandler alloc] initWithFd:self.fd pid:pid]; + _ioHandler.delegate = self; + [_ioHandler start]; +} + +- (void)testTeardownDispatchSourcesForTesting { + [_ioHandler teardown]; + _ioHandler = nil; +} + +- (void)testAppendDataToWriteBuffer:(NSData *)data { + [writeLock lock]; + [writeBuffer appendData:data]; + [writeLock unlock]; +} + +- (BOOL)testShouldWriteOverride { + return _testShouldWriteOverride; +} + +- (void)setTestShouldWriteOverride:(BOOL)testShouldWriteOverride { + _testShouldWriteOverride = testShouldWriteOverride; +} + +- (NSNumber *)testIoAllowedOverride { + return _testIoAllowedOverride; +} + +- (void)setTestIoAllowedOverride:(NSNumber *)testIoAllowedOverride { + _testIoAllowedOverride = testIoAllowedOverride; +} + +- (void)testSetJobManager:(id)jobManager { + self.jobManager = jobManager; +} + +- (BOOL)testHasCoprocessReadSource { + return _ioHandler.testHasCoprocessReadSource; +} + +- (BOOL)testHasCoprocessWriteSource { + return _ioHandler.testHasCoprocessWriteSource; +} + +- (BOOL)testIsCoprocessReadSourceSuspended { + return _ioHandler.testIsCoprocessReadSourceSuspended; +} + +- (BOOL)testIsCoprocessWriteSourceSuspended { + return _ioHandler.testIsCoprocessWriteSourceSuspended; +} + +- (void)testSetupCoprocessSourcesWithReadFd:(int)readFd writeFd:(int)writeFd { + [_ioHandler setupCoprocessSourcesWithReadFd:readFd writeFd:writeFd]; +} + +- (void)testTeardownCoprocessSources { + [_ioHandler teardownCoprocessSources]; +} + +- (void)testWaitForIOQueue { + [_ioHandler testWaitForIOQueue]; +} + +- (void)testSimulateProcessExit { + [_ioHandler testSimulateProcessExit]; +} + +@end diff --git a/sources/PTYTaskIOHandler.swift b/sources/PTYTaskIOHandler.swift new file mode 100644 index 0000000000..30df914410 --- /dev/null +++ b/sources/PTYTaskIOHandler.swift @@ -0,0 +1,438 @@ +import Foundation + +/// Bytes per read chunk, matching MAXRW in PTYTask.m. +private let kMaxReadWrite: Int = 1024 + +// MARK: - Delegate Protocol + +/// Delegate protocol for PTYTaskIOHandler. PTYTask implements this to bridge +/// dispatch source events back to its internal state. +/// +/// Predicate methods are called from any queue and must be thread-safe. +/// Event methods are called on the handler's ioQueue. +@objc protocol PTYTaskIOHandlerDelegate: AnyObject { + // MARK: State predicates (any queue, thread-safe) + + /// Whether reading should be enabled. Checks paused, ioAllowed, backpressure. + func ioHandlerShouldRead(_ handler: PTYTaskIOHandler) -> Bool + + /// Whether writing should be enabled. Checks paused, ioAllowed, buffer has data. + func ioHandlerShouldWrite(_ handler: PTYTaskIOHandler) -> Bool + + /// Whether the coprocess read source should be resumed. + func ioHandlerShouldResumeCoprocessRead(_ handler: PTYTaskIOHandler) -> Bool + + /// Whether the coprocess write source should be resumed. + func ioHandlerShouldResumeCoprocessWrite(_ handler: PTYTaskIOHandler) -> Bool + + // MARK: PTY read events (ioQueue) + + /// Data was read from the PTY file descriptor. + func ioHandler(_ handler: PTYTaskIOHandler, didReadData buffer: UnsafePointer, length: Int32) + + /// A broken pipe (EOF or fatal read error) was detected on the PTY fd. + func ioHandlerDidDetectBrokenPipe(_ handler: PTYTaskIOHandler) + + // MARK: PTY write events (ioQueue) + + /// The write source fired; drain the write buffer now. + func ioHandlerDrainWriteBuffer(_ handler: PTYTaskIOHandler) + + // MARK: Coprocess events (ioQueue) + + /// The coprocess read source fired. Delegate should read from the coprocess, + /// handle EOF, and route data as needed. + func ioHandlerHandleCoprocessRead(_ handler: PTYTaskIOHandler) + + /// The coprocess write source fired. Delegate should flush coprocess output buffer. + func ioHandlerHandleCoprocessWrite(_ handler: PTYTaskIOHandler) +} + +// MARK: - PTYTaskIOHandler + +/// Manages dispatch sources for PTY I/O in the fairness scheduler path. +/// +/// Owns the ioQueue, read/write dispatch sources, and coprocess dispatch sources. +/// All source event handlers run on the serial ioQueue. The delegate (PTYTask) +/// provides state predicates and receives event callbacks. +/// +/// This class has no dependency on TaskNotifier. +@objc class PTYTaskIOHandler: NSObject { + + @objc weak var delegate: PTYTaskIOHandlerDelegate? + + /// The PTY file descriptor. Set at init, immutable. + private let fd: Int32 + + /// The child process PID for exit monitoring. 0 if unknown (e.g., tmux tasks). + private let childPid: pid_t + + /// Serial queue for all dispatch source handlers. + let ioQueue: DispatchQueue + private let ioQueueKey = DispatchSpecificKey() + + // MARK: Primary dispatch sources + + // Access on ioQueue only (after start) + private var readSource: DispatchSourceRead? + private var writeSource: DispatchSourceWrite? + + /// Monitors the child process for exit. When the read source is suspended + /// (for backpressure, pause, or copy mode), EOF delivery is blocked. The + /// proc source detects process exit independently and force-resumes the + /// read source so EOF can be delivered without polling. + // Access on ioQueue only (after start) + private var procSource: DispatchSourceProcess? + + // Access on ioQueue only + private var readSourceSuspended = true + private var writeSourceSuspended = true + + /// Set when the proc source fires. Once true, the read source is never + /// re-suspended — it must remain active to drain remaining data and + /// detect EOF. + // Access on ioQueue only + private var processExited = false + + // MARK: Coprocess dispatch sources + + // Access on ioQueue only (after setupCoprocessSources) + private var coprocessReadSource: DispatchSourceRead? + private var coprocessWriteSource: DispatchSourceWrite? + + // Access on ioQueue only + private var coprocessReadSourceSuspended = false + private var coprocessWriteSourceSuspended = false + + // MARK: - Init + + @objc init(fd: Int32, pid: pid_t) { + precondition(fd >= 0, "PTYTaskIOHandler requires a valid fd") + self.fd = fd + self.childPid = pid + self.ioQueue = DispatchQueue(label: "com.iterm2.pty-io") + super.init() + ioQueue.setSpecific(key: ioQueueKey, value: ()) + } + + // MARK: - Lifecycle + + /// Main queue. Creates read and write dispatch sources on the fd. + /// Sources start suspended; updateReadSourceState/updateWriteSourceState + /// resume them if conditions allow. + @objc func start() { + // Read source — starts suspended, resumed by updateReadSourceState when + // delegate says reading is allowed. Provides backpressure by suspending + // when the token pipeline is full. + let rs = DispatchSource.makeReadSource(fileDescriptor: fd, queue: ioQueue) + rs.setEventHandler { [weak self] in + self?.handleReadEvent() + } + rs.resume() // Must resume before we can suspend + rs.suspend() // Start suspended + readSource = rs + readSourceSuspended = true + + // Write source + let ws = DispatchSource.makeWriteSource(fileDescriptor: fd, queue: ioQueue) + ws.setEventHandler { [weak self] in + self?.handleWriteEvent() + } + ws.resume() + ws.suspend() + writeSource = ws + writeSourceSuspended = true + + // Process exit source — detects child exit so we can force-resume + // the read source for EOF delivery even when suspended for + // backpressure, pause, or copy mode. + if childPid > 0 { + let ps = DispatchSource.makeProcessSource( + identifier: childPid, + eventMask: .exit, + queue: ioQueue) + ps.setEventHandler { [weak self] in + self?.handleProcessExit() + } + ps.resume() + procSource = ps + } + + // Initial state sync + updateReadSourceState() + updateWriteSourceState() + } + + /// Any queue. Tears down all sources (primary + coprocess + proc). + @objc func teardown() { + teardownCoprocessSources() + syncOnIOQueue { [self] in + let ps = procSource + procSource = nil + // Proc source is never suspended — cancel directly. + ps?.cancel() + cancelAndNilSource(&readSource, suspended: &readSourceSuspended) + cancelAndNilSource(&writeSource, suspended: &writeSourceSuspended) + } + } + + // MARK: - State Updates (any queue) + + /// Any queue. Snapshots shouldRead from delegate, dispatches to ioQueue. + @objc func updateReadSourceState() { + // Never re-suspend after processExited: must drain remaining data and + // deliver EOF. For pid <= 0 (tmux), there is no proc source — but + // there is also no child process to exit. EOF arrives when the tmux + // server closes the fd; GCD queues that event and delivers it when + // the read source is next resumed (e.g., user unpauses). This matches + // legacy select() behavior where paused fds are omitted from the read set. + let shouldRead = delegate?.ioHandlerShouldRead(self) ?? false + ioQueue.async { [weak self] in + guard let self else { return } + self.setSourceSuspended(&self.readSource, + suspended: &self.readSourceSuspended, + shouldResume: shouldRead || self.processExited) + } + } + + /// Any queue. Snapshots shouldWrite from delegate, dispatches to ioQueue. + @objc func updateWriteSourceState() { + let shouldWrite = delegate?.ioHandlerShouldWrite(self) ?? false + ioQueue.async { [weak self] in + guard let self else { return } + self.setSourceSuspended(&self.writeSource, + suspended: &self.writeSourceSuspended, + shouldResume: shouldWrite) + } + } + + /// Any queue. Called when data is added to the write buffer. + @objc func writeBufferDidChange() { + updateWriteSourceState() + } + + // MARK: - Coprocess Source Management + + /// Sets up coprocess dispatch sources for the given file descriptors. + /// Requires start() to have been called first (ioQueue must exist). + @objc func setupCoprocessSources(readFd: Int32, writeFd: Int32) { + guard readFd >= 0, writeFd >= 0 else { return } + + // Tear down any existing coprocess sources before creating new ones. + teardownCoprocessSources() + + // Create sources outside ioQueue (just object construction), + // then assign references and flags on ioQueue to avoid racing + // with event handlers and updateCoprocess*SourceState. + + // Read source — reads coprocess stdout, feeds data back as PTY input + let crs = DispatchSource.makeReadSource(fileDescriptor: readFd, queue: ioQueue) + crs.setEventHandler { [weak self] in + self?.handleCoprocessReadEvent() + } + crs.resume() + crs.suspend() + + // Write source — flushes outputBuffer to coprocess stdin + let cws = DispatchSource.makeWriteSource(fileDescriptor: writeFd, queue: ioQueue) + cws.setEventHandler { [weak self] in + self?.handleCoprocessWriteEvent() + } + cws.resume() + cws.suspend() + + syncOnIOQueue { [self] in + coprocessReadSource = crs + coprocessReadSourceSuspended = true + coprocessWriteSource = cws + coprocessWriteSourceSuspended = true + } + + updateCoprocessReadSourceState() + updateCoprocessWriteSourceState() + } + + /// Any queue. Tears down coprocess dispatch sources. + @objc func teardownCoprocessSources() { + syncOnIOQueue { [self] in + cancelAndNilSource(&coprocessReadSource, suspended: &coprocessReadSourceSuspended) + cancelAndNilSource(&coprocessWriteSource, suspended: &coprocessWriteSourceSuspended) + } + } + + /// Any queue. Snapshots coprocess read predicate, dispatches to ioQueue. + @objc func updateCoprocessReadSourceState() { + let shouldResume = delegate?.ioHandlerShouldResumeCoprocessRead(self) ?? false + ioQueue.async { [weak self] in + guard let self else { return } + self.setSourceSuspended(&self.coprocessReadSource, + suspended: &self.coprocessReadSourceSuspended, + shouldResume: shouldResume) + } + } + + /// Any queue. Snapshots coprocess write predicate, dispatches to ioQueue. + @objc func updateCoprocessWriteSourceState() { + let shouldResume = delegate?.ioHandlerShouldResumeCoprocessWrite(self) ?? false + ioQueue.async { [weak self] in + guard let self else { return } + self.setSourceSuspended(&self.coprocessWriteSource, + suspended: &self.coprocessWriteSourceSuspended, + shouldResume: shouldResume) + } + } + + // MARK: - Private Event Handlers (ioQueue) + + /// The child process exited. Force-resume the read source so it can + /// drain remaining data and detect EOF, even if currently suspended for + /// backpressure, pause, or copy mode. + /// + /// Edge case: if the user is in copy mode when the process exits, we + /// resume the read source and deliver the broken pipe. This matches + /// legacy TaskNotifier behavior where select() detects EOF regardless + /// of pause state. + private func handleProcessExit() { + processExited = true + if readSourceSuspended, let rs = readSource { + rs.resume() + readSourceSuspended = false + } + } + + /// Read up to 4 * kMaxReadWrite bytes from the PTY fd per event. + private func handleReadEvent() { + let iterations = 4 + let bufferSize = kMaxReadWrite * iterations + let buffer = UnsafeMutablePointer.allocate(capacity: bufferSize) + defer { buffer.deallocate() } + + var totalBytesRead: Int = 0 + var gotEOF = false + + for _ in 0.. 0 { + delegate?.ioHandler(self, didReadData: buffer, length: Int32(totalBytesRead)) + // Re-check state after read (backpressure may have increased) + updateReadSourceState() + } + + if gotEOF { + delegate?.ioHandlerDidDetectBrokenPipe(self) + } + } + + /// Write source fired; delegate drains the write buffer. + private func handleWriteEvent() { + delegate?.ioHandlerDrainWriteBuffer(self) + // Re-check state after write (buffer may now be empty) + updateWriteSourceState() + // Write buffer shrank — coprocess read source may now be eligible to resume + updateCoprocessReadSourceState() + } + + /// Coprocess read source fired; delegate handles the I/O. + private func handleCoprocessReadEvent() { + delegate?.ioHandlerHandleCoprocessRead(self) + updateCoprocessReadSourceState() + } + + /// Coprocess write source fired; delegate flushes the output buffer. + private func handleCoprocessWriteEvent() { + delegate?.ioHandlerHandleCoprocessWrite(self) + updateCoprocessWriteSourceState() + } + + // MARK: - Private Helpers + + /// Whether we are currently executing on this handler's ioQueue. + private var isOnIOQueue: Bool { + DispatchQueue.getSpecific(key: ioQueueKey) != nil + } + + /// Run a block on ioQueue. Executes inline if already on ioQueue, + /// otherwise dispatches synchronously. + private func syncOnIOQueue(_ block: () -> Void) { + if isOnIOQueue { + block() + } else { + ioQueue.sync(execute: block) + } + } + + /// ioQueue only. Suspend or resume a source based on `shouldResume`. + /// No-op if the source is nil or already in the desired state. + private func setSourceSuspended(_ source: inout (some DispatchSourceProtocol)?, + suspended: inout Bool, + shouldResume: Bool) { + guard let s = source else { return } + if shouldResume && suspended { + s.resume() + suspended = false + } else if !shouldResume && !suspended { + s.suspend() + suspended = true + } + } + + /// ioQueue only. Resume a suspended source (per GCD rules), cancel it, + /// then nil the reference. + private func cancelAndNilSource(_ source: inout (some DispatchSourceProtocol)?, + suspended: inout Bool) { + guard let s = source else { return } + if suspended { + s.resume() + } + s.cancel() + source = nil + suspended = false + } +} + +// MARK: - Test Accessors + +extension PTYTaskIOHandler { + @objc var testHasReadSource: Bool { readSource != nil } + @objc var testHasWriteSource: Bool { writeSource != nil } + @objc var testIsReadSourceSuspended: Bool { readSourceSuspended } + @objc var testIsWriteSourceSuspended: Bool { writeSourceSuspended } + + @objc var testHasCoprocessReadSource: Bool { coprocessReadSource != nil } + @objc var testHasCoprocessWriteSource: Bool { coprocessWriteSource != nil } + @objc var testIsCoprocessReadSourceSuspended: Bool { coprocessReadSourceSuspended } + @objc var testIsCoprocessWriteSourceSuspended: Bool { coprocessWriteSourceSuspended } + + /// Synchronously wait for the ioQueue to drain all pending work. + @objc func testWaitForIOQueue() { + ioQueue.sync {} + } + + /// Simulates process exit for tests that use pipes instead of real child + /// processes. Dispatches handleProcessExit() on the ioQueue. + @objc func testSimulateProcessExit() { + ioQueue.async { [weak self] in + self?.handleProcessExit() + } + } +} diff --git a/sources/TokenArray.swift b/sources/TokenArray.swift index 75f5154819..ede19763db 100644 --- a/sources/TokenArray.swift +++ b/sources/TokenArray.swift @@ -26,6 +26,8 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { return DispatchQueue(label: "com.iterm2.token-destroyer") }() private var semaphore: DispatchSemaphore? + // Called when the semaphore is signaled (slot released back to backpressure pool). + private var onSemaphoreSignaled: (() -> Void)? var hasNext: Bool { return nextIndex < count @@ -53,16 +55,18 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { nextToken?.asciiData.pointee.buffer[0] == 13) } - // length is byte length ofinputs + // length is byte length of inputs init(_ cvector: CVector, lengthTotal: Int, lengthExcludingInBandSignaling: Int, - semaphore: DispatchSemaphore?) { + semaphore: DispatchSemaphore?, + onSemaphoreSignaled: (() -> Void)? = nil) { precondition(lengthTotal > 0 && lengthExcludingInBandSignaling >= 0) self.cvector = cvector self.lengthTotal = lengthTotal self.lengthExcludingInBandSignaling = lengthExcludingInBandSignaling self.semaphore = semaphore + self.onSemaphoreSignaled = onSemaphoreSignaled count = CVectorCount(&self.cvector) } @@ -74,7 +78,9 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { nextIndex += 1 if nextIndex == count, let semaphore = semaphore { semaphore.signal() + onSemaphoreSignaled?() self.semaphore = nil + self.onSemaphoreSignaled = nil } } return (CVectorGetObject(&cvector, nextIndex) as! VT100Token) @@ -99,7 +105,9 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { nextIndex += 1 if nextIndex == count, let semaphore = semaphore { semaphore.signal() + onSemaphoreSignaled?() self.semaphore = nil + self.onSemaphoreSignaled = nil } return hasNext } @@ -112,7 +120,9 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { nextIndex = count if let semaphore = semaphore { semaphore.signal() + onSemaphoreSignaled?() self.semaphore = nil + self.onSemaphoreSignaled = nil } } @@ -120,7 +130,9 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { func didFinish() { semaphore?.signal() + onSemaphoreSignaled?() semaphore = nil + onSemaphoreSignaled = nil } func cleanup(asyncFree: Bool) { @@ -129,6 +141,9 @@ class TokenArray: IteratorProtocol, CustomDebugStringConvertible { } dirty = false semaphore?.signal() + onSemaphoreSignaled?() + semaphore = nil + onSemaphoreSignaled = nil if asyncFree { TokenArray.destroyQueue.async { [cvector] in CVectorReleaseObjectsAndDestroy(cvector) diff --git a/sources/TokenExecutor.swift b/sources/TokenExecutor.swift index ea2423f9fc..70db5b1754 100644 --- a/sources/TokenExecutor.swift +++ b/sources/TokenExecutor.swift @@ -94,6 +94,21 @@ func CVectorReleaseObjectsAndDestroy(_ vector: CVector) { CVectorDestroy(&temp) } +// Indicates the current level of backpressure on token processing. +// Higher levels mean the mutation queue is falling behind. +// Conforms to Comparable for natural ordering comparisons. +@objc enum BackpressureLevel: Int, Comparable { + case none = 0 // > 75% slots available + case light = 1 // 50-75% available + case moderate = 2 // 25-50% available + case heavy = 3 // < 25% available + case blocked = 4 // 0 available, PTY read is blocked + + static func < (lhs: BackpressureLevel, rhs: BackpressureLevel) -> Bool { + return lhs.rawValue < rhs.rawValue + } +} + @objc(iTermTokenExecutor) class TokenExecutor: NSObject { @objc weak var delegate: TokenExecutorDelegate? { @@ -101,6 +116,14 @@ class TokenExecutor: NSObject { impl.delegate = delegate } } + private let totalSlots = Int(iTermAdvancedSettingsModel.bufferDepth()) + // Initialize counter to totalSlots immediately (not in init) to avoid a window + // where backpressureLevel could return .blocked before init completes. + private var availableSlots = { + let counter = iTermAtomicInt64Create() + iTermAtomicInt64Add(counter, Int64(iTermAdvancedSettingsModel.bufferDepth())) + return counter + }() private let semaphore = DispatchSemaphore(value: Int(iTermAdvancedSettingsModel.bufferDepth())) private let impl: TokenExecutorImpl private let queue: DispatchQueue @@ -108,6 +131,15 @@ class TokenExecutor: NSObject { private var onExecutorQueue: Bool { return DispatchQueue.getSpecific(key: Self.isTokenExecutorSpecificKey) == true } + + /// Closure called when backpressure transitions from heavy to lighter. + /// Used by PTYTask to re-evaluate read source state. + @objc var backpressureReleaseHandler: (() -> Void)? { + didSet { + impl.backpressureReleaseHandler = backpressureReleaseHandler + } + } + @objc var isBackgroundSession = false { didSet { #if DEBUG @@ -138,6 +170,32 @@ class TokenExecutor: NSObject { queue: queue) } + // Returns the current backpressure level based on available slots. + // This can be called from any queue. + // + // NOTE: High-priority tokens bypass backpressure (no semaphore wait), so this + // metric only reflects load from normal PTY token processing. This is intentional: + // high-priority tokens are meant to bypass flow control. + @objc var backpressureLevel: BackpressureLevel { + let available = Int(iTermAtomicInt64Get(availableSlots)) + // Use <= 0 since availableSlots can go negative when more tokens are added + // than total capacity (the counter isn't clamped at 0) + if available <= 0 { + return .blocked + } + let ratio = Double(available) / Double(totalSlots) + switch ratio { + case ..<0.25: + return .heavy + case ..<0.50: + return .moderate + case ..<0.75: + return .light + default: + return .none + } + } + // This takes ownership of vector. // You can call this on any queue. @objc @@ -183,12 +241,14 @@ class TokenExecutor: NSObject { semaphore: nil as DispatchSemaphore?) return } - // Normal code path for tokens from PTY. Use the semaphore to give backpressure to reading. + // Block on semaphore for backpressure to the reading thread (TaskNotifier or ioQueue). let semaphore = self.semaphore if enableTimingStats { TokenExecutor.addTokensTimingStats.recordEnd() } _ = semaphore.wait(timeout: .distantFuture) + // Track that we've consumed a slot for backpressure monitoring + iTermAtomicInt64Add(availableSlots, -1) if enableTimingStats { TokenExecutor.addTokensTimingStats.recordStart() } @@ -249,10 +309,28 @@ class TokenExecutor: NSObject { lengthExcludingInBandSignaling: Int, highPriority: Bool, semaphore: DispatchSemaphore?) { + // When a slot is released (token array consumed), increment the available slots counter + // and notify if backpressure has eased. This fires for all non-high-priority tokens + // regardless of whether a semaphore is used (legacy path) or not (dispatch sources path). + // High-priority tokens bypass backpressure accounting entirely. + let onSemaphoreSignaled: (() -> Void)? + if !highPriority { + onSemaphoreSignaled = { [weak self, availableSlots] in + guard let self = self else { return } + let newValue = iTermAtomicInt64Add(availableSlots, 1) + // Notify when crossing out of heavy backpressure + if newValue > 0 && self.backpressureLevel < .heavy { + self.impl.backpressureReleaseHandler?() + } + } + } else { + onSemaphoreSignaled = nil + } let tokenArray = TokenArray(vector, lengthTotal: lengthTotal, lengthExcludingInBandSignaling: lengthExcludingInBandSignaling, - semaphore: semaphore) + semaphore: semaphore, + onSemaphoreSignaled: onSemaphoreSignaled) self.impl.addTokens(tokenArray, highPriority: highPriority) } @@ -329,6 +407,9 @@ private class TokenExecutorImpl { private(set) var isExecutingToken = false weak var delegate: TokenExecutorDelegate? + /// Closure called when backpressure transitions from heavy to lighter. + var backpressureReleaseHandler: (() -> Void)? + // This is used to give visible sessions priority for token processing over those that cannot // be seen. This prevents a very busy non-selected tab from starving a visible one. private static var activeSessionsWithTokens = MutableAtomicObject>(Set()) diff --git a/sources/iTerm2SharedARC-Bridging-Header.h b/sources/iTerm2SharedARC-Bridging-Header.h index ef195622bc..1ef1571db8 100644 --- a/sources/iTerm2SharedARC-Bridging-Header.h +++ b/sources/iTerm2SharedARC-Bridging-Header.h @@ -186,6 +186,7 @@ #import "PTYSession.h" #import "PTYSession+ARC.h" #import "PTYTask.h" +#import "PTYTask+Private.h" #import "PTYTextView+ARC.h" #import "PTYTextView.h" #import "PTYTextView+Private.h" diff --git a/sources/iTermAdvancedSettingsModel.h b/sources/iTermAdvancedSettingsModel.h index 4436cc17e5..6ab59f50d3 100644 --- a/sources/iTermAdvancedSettingsModel.h +++ b/sources/iTermAdvancedSettingsModel.h @@ -375,6 +375,7 @@ extern NSString *const iTermAdvancedSettingsDidChange; + (BOOL)revealExportedSettingsAndData; + (BOOL)rightJustifyRTLLines; + (BOOL)runJobsInServers; ++ (BOOL)usePerPTYDispatchSources; + (BOOL)saveToPasteHistoryWhenSecureInputEnabled; + (double)scrollWheelAcceleration; + (NSString *)searchCommand; diff --git a/sources/iTermAdvancedSettingsModel.m b/sources/iTermAdvancedSettingsModel.m index ad9d9fac50..92fc245bdd 100644 --- a/sources/iTermAdvancedSettingsModel.m +++ b/sources/iTermAdvancedSettingsModel.m @@ -603,6 +603,7 @@ + (BOOL)settingIsDeprecated:(NSString *)name { DEFINE_BOOL(bootstrapDaemon, YES, SECTION_SESSION @"Allow sessions to survive logging out and back in.\nThis breaks the “auth sufficient pam_tid.so” hack some people use to allow sudo to authenticate with Touch ID."); DEFINE_BOOL(killJobsInServersOnQuit, YES, SECTION_SESSION @"User-initiated Quit (⌘Q) of iTerm2 will kill all running jobs.\nApplies only when session restoration is on."); +DEFINE_BOOL(usePerPTYDispatchSources, NO, SECTION_SESSION @"Use per-PTY dispatch sources instead of a shared select() loop for I/O.\nThis provides better session isolation but is experimental.\nRequires restart."); DEFINE_SETTABLE_BOOL(suppressRestartAnnouncement, SuppressRestartAnnouncement, NO, SECTION_SESSION @"Suppress the Restart Session offer.\nWhen a session terminates, it will offer to restart itself. Turn this on to suppress the offer permanently."); DEFINE_BOOL(showSessionRestoredBanner, YES, SECTION_SESSION @"When restoring a session without restoring a running job, draw a banner saying “Session Contents Restored” below the restored contents."); DEFINE_DEPRECATED_STRING(autoLogFormat, diff --git a/sources/iTermLegacyJobManager.m b/sources/iTermLegacyJobManager.m index 08bbc723b1..8ccfd028aa 100644 --- a/sources/iTermLegacyJobManager.m +++ b/sources/iTermLegacyJobManager.m @@ -10,6 +10,7 @@ #import "DebugLogging.h" #import "iTermProcessCache.h" #import "NSArray+iTerm.h" +#import "PTYTask.h" #import "PTYTask+MRR.h" #import "TaskNotifier.h" @@ -89,7 +90,7 @@ - (void)forkAndExecWithTtyState:(iTermTTYState)ttyState }); if (status == iTermJobManagerForkAndExecStatusSuccess) { DLog(@"Register task for pid %@", @(self.childPid)); - [[TaskNotifier sharedInstance] registerTask:task]; + [PTYTask registerTaskWithNotifier:task]; } if (completion) { completion(status, nil); diff --git a/sources/iTermMonoServerJobManager.m b/sources/iTermMonoServerJobManager.m index 631f3fa28d..bc06b86c7e 100644 --- a/sources/iTermMonoServerJobManager.m +++ b/sources/iTermMonoServerJobManager.m @@ -14,6 +14,7 @@ #import "iTermProcessCache.h" #import "NSArray+iTerm.h" #import "NSWorkspace+iTerm.h" +#import "PTYTask.h" #import "PTYTask+MRR.h" #import "TaskNotifier.h" @@ -95,7 +96,7 @@ - (void)forkAndExecWithTtyState:(iTermTTYState)ttyState NSNumber *optionalErrorCode) { if (status == iTermJobManagerForkAndExecStatusSuccess) { DLog(@"Register %@ after fork and exec", @(task.pid)); - [[TaskNotifier sharedInstance] registerTask:task]; + [PTYTask registerTaskWithNotifier:task]; } completion(status, optionalErrorCode); }; @@ -281,7 +282,7 @@ - (iTermJobManagerAttachResults)attachToServer:(iTermGeneralServerConnection)ser [self queueAttachToServer:serverConnection task:task]; }); DLog(@"Register task for %@", pidNumber); - [[TaskNotifier sharedInstance] registerTask:task]; + [PTYTask registerTaskWithNotifier:task]; if (pidNumber == nil) { return results; } diff --git a/sources/iTermMultiServerJobManager.m b/sources/iTermMultiServerJobManager.m index e0b7418484..69551e6a1c 100644 --- a/sources/iTermMultiServerJobManager.m +++ b/sources/iTermMultiServerJobManager.m @@ -16,6 +16,7 @@ #import "NSArray+iTerm.h" #import "NSFileManager+iTerm.h" #import "NSObject+iTerm.h" +#import "PTYTask.h" #import "TaskNotifier.h" NSString *const iTermMultiServerRestorationKeyType = @"Type"; @@ -180,7 +181,7 @@ - (void)queueForkAndExecWithForkRequest:(iTermMultiServerJobManagerForkRequest)f dispatch_async(dispatch_get_main_queue(), ^{ [[iTermProcessCache sharedInstance] registerTrackedPID:child.pid]; DLog(@"Register %@ after server successfully execs job", @(child.pid)); - [[TaskNotifier sharedInstance] registerTask:task]; + [PTYTask registerTaskWithNotifier:task]; [[iTermProcessCache sharedInstance] setNeedsUpdate:YES]; completion(iTermJobManagerForkAndExecStatusSuccess, nil); }); @@ -430,7 +431,7 @@ - (void)didAttachToProcess:(pid_t)pid task:(id)task state:(iTermMainT [[iTermProcessCache sharedInstance] registerTrackedPID:pid]; DLog(@"Register task %@ after attaching", @(pid)); - [[TaskNotifier sharedInstance] registerTask:task]; + [PTYTask registerTaskWithNotifier:task]; [[iTermProcessCache sharedInstance] setNeedsUpdate:YES]; }