Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
40452b7
test: cover ssh pty queued terminal replies
austinywang Jun 14, 2026
0ad3258
fix: drop queued ssh pty probe replies on restore
austinywang Jun 14, 2026
40c21e0
Merge remote-tracking branch 'origin/main' into issue-6061-remote-res…
austinywang Jun 14, 2026
c225d16
fix: keep ssh pty reconnect filter out of long files
austinywang Jun 14, 2026
ba6dca7
fix: avoid unchecked sendable in ssh pty filter
austinywang Jun 14, 2026
5202422
fix: pass through ambiguous ssh pty escape input
austinywang Jun 14, 2026
d32cbb0
test: cover ssh pty reconnect filter boundaries
austinywang Jun 14, 2026
804a5e0
fix: keep reconnect probe filter active across reads
austinywang Jun 14, 2026
7b19617
fix: buffer reconnect probe escape prefix
austinywang Jun 14, 2026
850a0bf
fix: flush bare escape after reconnect drain
austinywang Jun 14, 2026
2d3069c
fix: bound reconnect probe reply drain
austinywang Jun 14, 2026
78052cb
fix: limit reconnect probe filter to terminal stdin
austinywang Jun 14, 2026
944128d
test: clean reconnect filter policy findings
austinywang Jun 14, 2026
798a105
fix: filter OSC 12 reconnect probe replies
austinywang Jun 14, 2026
1c0be37
merge: sync with main
austinywang Jun 14, 2026
a827288
fix: drain reconnect probes before relaying output
austinywang Jun 14, 2026
1a4867a
merge: sync with main
austinywang Jun 14, 2026
4e6ef6c
fix: keep reconnect probe filtering until bridge output
austinywang Jun 14, 2026
88132a3
Merge remote-tracking branch 'origin/main' into issue-6061-remote-res…
austinywang Jun 14, 2026
6a6bec4
Merge remote-tracking branch 'origin/main' into issue-6061-remote-res…
austinywang Jun 14, 2026
53f675e
Merge remote-tracking branch 'origin/main' into issue-6061-remote-res…
austinywang Jun 14, 2026
6d9eca7
merge: resolve conflicts with origin/main
austinywang Jun 14, 2026
be8c2c6
fix: stop reconnect input filtering after bridge output
austinywang Jun 14, 2026
3841ee6
Merge remote-tracking branch 'origin/main' into issue-6061-remote-res…
austinywang Jun 14, 2026
5ecc4f3
chore: refresh swift file length budget
austinywang Jun 14, 2026
3c6f022
fix: signal reconnect filter stop without shared lock
austinywang Jun 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 224 additions & 1 deletion CLI/cmux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10075,6 +10075,215 @@ struct CMUXCLI {
}
}

// Confined to the single ssh-pty-attach stdin pump thread; the class is
// captured by DispatchQueue's @Sendable closure but never shared elsewhere.
private final class SSHPTYAttachReconnectInputFilter: @unchecked Sendable {
Comment thread
austinywang marked this conversation as resolved.
Outdated
private enum SequenceMatch {
case strip(length: Int)
case incomplete
case passThrough
}

private static let escape: UInt8 = 0x1B
private static let bell: UInt8 = 0x07
private static let leftBracket: UInt8 = 0x5B
private static let rightBracket: UInt8 = 0x5D
private static let backslash: UInt8 = 0x5C
private static let semicolon: UInt8 = 0x3B
private static let questionMark: UInt8 = 0x3F
private static let greaterThan: UInt8 = 0x3E
private static let dollar: UInt8 = 0x24

private var isFiltering: Bool
private var pending = [UInt8]()

init(enabled: Bool) {
isFiltering = enabled
}

func filter(_ data: Data) -> Data {
guard isFiltering, !data.isEmpty else {
return data
}

var bytes = pending
pending.removeAll(keepingCapacity: true)
bytes.append(contentsOf: data)

var output = Data()
var index = 0
while index < bytes.count {
guard bytes[index] == Self.escape else {
isFiltering = false
output.append(contentsOf: bytes[index...])
return output
}

switch Self.reconnectProbeReplySequence(in: bytes, at: index) {
case .strip(let length):
index += length
case .incomplete:
pending.append(contentsOf: bytes[index...])
return output
case .passThrough:
isFiltering = false
output.append(contentsOf: bytes[index...])
return output
}
}

return output
}

func finish() -> Data {
guard !pending.isEmpty else {
return Data()
}
let data = Data(pending)
pending.removeAll(keepingCapacity: false)
return data
}

private static func reconnectProbeReplySequence(in bytes: [UInt8], at start: Int) -> SequenceMatch {
guard start < bytes.count, bytes[start] == escape else {
return .passThrough
}
guard start + 1 < bytes.count else {
return .incomplete
}

switch bytes[start + 1] {
case rightBracket:
return oscColorReplySequence(in: bytes, at: start)
case leftBracket:
return csiProbeReplySequence(in: bytes, at: start)
default:
return .passThrough
}
}

private static func oscColorReplySequence(in bytes: [UInt8], at start: Int) -> SequenceMatch {
var cursor = start + 2
var command = [UInt8]()

while cursor < bytes.count {
let byte = bytes[cursor]
if byte == semicolon {
break
}
if byte < 0x30 || byte > 0x39 || command.count >= 2 {
return .passThrough
}
command.append(byte)
cursor += 1
}

guard cursor < bytes.count else {
return oscCommandCouldBecomeColorReply(command) ? .incomplete : .passThrough
}
guard bytes[cursor] == semicolon else {
return .passThrough
}
guard command == [0x31, 0x30] || command == [0x31, 0x31] else {
return .passThrough
}

cursor += 1
while cursor < bytes.count {
let byte = bytes[cursor]
if byte == bell {
return .strip(length: cursor - start + 1)
}
if byte == escape {
guard cursor + 1 < bytes.count else {
return .incomplete
}
if bytes[cursor + 1] == backslash {
return .strip(length: cursor - start + 2)
}
}
cursor += 1
}
return .incomplete
}

private static func oscCommandCouldBecomeColorReply(_ command: [UInt8]) -> Bool {
if command.isEmpty {
return true
}
if command == [0x31] {
return true
}
return command == [0x31, 0x30] || command == [0x31, 0x31]
}

private static func csiProbeReplySequence(in bytes: [UInt8], at start: Int) -> SequenceMatch {
var cursor = start + 2
while cursor < bytes.count {
let byte = bytes[cursor]
if byte >= 0x40, byte <= 0x7E {
return shouldStripCSIReply(bytes: bytes, bodyStart: start + 2, finalIndex: cursor)
? .strip(length: cursor - start + 1)
: .passThrough
}
guard byte >= 0x20, byte <= 0x3F else {
return .passThrough
}
cursor += 1
}
return .incomplete
}

private static func shouldStripCSIReply(bytes: [UInt8], bodyStart: Int, finalIndex: Int) -> Bool {
var parameterEnd = bodyStart
while parameterEnd < finalIndex, bytes[parameterEnd] >= 0x30, bytes[parameterEnd] <= 0x3F {
parameterEnd += 1
}
guard bytes[parameterEnd..<finalIndex].allSatisfy({ $0 >= 0x20 && $0 <= 0x2F }) else {
return false
}

let parameters = bytes[bodyStart..<parameterEnd]
let intermediates = bytes[parameterEnd..<finalIndex]
let final = bytes[finalIndex]

switch final {
case 0x52, 0x63, 0x6E:
return intermediates.isEmpty
case 0x75:
guard intermediates.isEmpty else { return false }
if parameters.contains(questionMark) {
return true
}
return firstNumericCSIParameter(parameters).map { $0 <= 16 } ?? false
case 0x79:
return intermediates.elementsEqual([dollar])
default:
return false
}
}

private static func firstNumericCSIParameter(_ parameters: ArraySlice<UInt8>) -> Int? {
var value = 0
var sawDigit = false
for byte in parameters {
if byte >= 0x30, byte <= 0x39 {
sawDigit = true
value = min((value * 10) + Int(byte - 0x30), 1_000_000)
continue
}
if byte == semicolon {
break
}
if byte == questionMark || byte == greaterThan {
continue
}
return nil
}
return sawDigit ? value : nil
}
}

private func runSSHSessionList(
commandArgs: [String],
client: SocketClient,
Expand Down Expand Up @@ -10590,18 +10799,32 @@ struct CMUXCLI {
)
defer { resizeSource.cancel() }

let reconnectInputFilter = SSHPTYAttachReconnectInputFilter(enabled: requireExisting && command == nil)
DispatchQueue.global(qos: .userInteractive).async {
var buffer = [UInt8](repeating: 0, count: 8192)
while true {
let count = Darwin.read(STDIN_FILENO, &buffer, buffer.count)
if count > 0 {
let input = reconnectInputFilter.filter(Data(buffer.prefix(count)))
guard !input.isEmpty else {
continue
}
do {
try self.writeAll(fd: fd, data: Data(buffer.prefix(count)))
try self.writeAll(fd: fd, data: input)
} catch {
_ = shutdown(fd, SHUT_WR)
return
}
} else if count == 0 {
let input = reconnectInputFilter.finish()
if !input.isEmpty {
do {
try self.writeAll(fd: fd, data: input)
} catch {
_ = shutdown(fd, SHUT_WR)
return
}
}
_ = shutdown(fd, SHUT_WR)
return
} else if errno != EINTR {
Expand Down
108 changes: 108 additions & 0 deletions cmuxTests/CLINotifyProcessIntegrationRegressionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3837,6 +3837,7 @@ final class CLINotifyProcessIntegrationRegressionTests: XCTestCase {
}
switch method {
case "workspace.remote.pty_bridge":
XCTAssertEqual((payload["params"] as? [String: Any])?["require_existing"] as? Bool, true)
return self.v2Response(
id: id,
ok: true,
Expand Down Expand Up @@ -3889,6 +3890,7 @@ final class CLINotifyProcessIntegrationRegressionTests: XCTestCase {
executablePath: cliPath,
arguments: [
"ssh-pty-attach",
"--require-existing",
"--workspace", workspaceId,
"--session-id", sessionId,
"--attachment-id", surfaceId,
Expand Down Expand Up @@ -4271,6 +4273,112 @@ final class CLINotifyProcessIntegrationRegressionTests: XCTestCase {
XCTAssertEqual(capturedHandshake?["rows"] as? Int, 43)
}

func testSSHPTYAttachDropsQueuedTerminalProbeRepliesBeforeForwardingInput() throws {
let cliPath = try bundledCLIPath()
let socketPath = makeSocketPath("sshptyprobe")
let listenerFD = try bindUnixSocket(at: socketPath)
let bridge = try bindLoopbackTCP()
let state = MockSocketServerState()
let workspaceId = "22222222-2222-2222-2222-222222222222"
let surfaceId = "33333333-3333-3333-3333-333333333333"
let sessionId = "ssh-\(workspaceId)-\(surfaceId)"
let token = "bridge-token"
let bridgeInput = MockBridgeInputCapture()

defer {
Darwin.close(listenerFD)
Darwin.close(bridge.fd)
unlink(socketPath)
}

let socketHandled = startMockServer(listenerFD: listenerFD, state: state) { line in
guard let payload = self.jsonObject(line),
let id = payload["id"] as? String,
let method = payload["method"] as? String else {
return self.malformedRequestResponse(raw: line)
}
switch method {
case "workspace.remote.pty_bridge":
return self.v2Response(
id: id,
ok: true,
result: [
"host": "127.0.0.1",
"port": bridge.port,
"token": token,
"session_id": sessionId,
"attachment_id": surfaceId,
]
)
case "workspace.remote.pty_sessions":
return self.v2Response(id: id, ok: true, result: ["sessions": []])
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
case "workspace.remote.pty_attach_end":
return self.v2Response(
id: id,
ok: true,
result: [
"workspace_id": workspaceId,
"surface_id": surfaceId,
"session_id": sessionId,
"cleared_remote_pty_session": true,
]
)
default:
return self.v2Response(
id: id,
ok: false,
error: ["code": "unexpected_method", "message": "Unexpected method \(method)"]
)
}
}
let bridgeHandled = startBridgeReadyCapturingInputUntilEOF(
listenerFD: bridge.fd,
capture: bridgeInput
)

var environment = ProcessInfo.processInfo.environment
environment["CMUX_SOCKET_PATH"] = socketPath
environment["CMUX_CLI_SENTRY_DISABLED"] = "1"

let queuedProbeReplies =
"\u{1B}]11;rgb:e5e5/e9e9/f0f0\u{1B}\\" +
"\u{1B}]10;rgb:4141/4848/5858\u{07}" +
"\u{1B}[1;1R" +
"\u{1B}[?1;2c" +
"\u{1B}[?0u" +
"\u{1B}[?12;2$y"
let forwardedInput = "printf keep\n"
let result = runProcess(
executablePath: cliPath,
arguments: [
"ssh-pty-attach",
"--workspace", workspaceId,
"--session-id", sessionId,
"--attachment-id", surfaceId,
],
Comment thread
austinywang marked this conversation as resolved.
Outdated
environment: environment,
standardInput: queuedProbeReplies + forwardedInput,
timeout: 5
)

wait(for: [socketHandled, bridgeHandled], timeout: 5)
XCTAssertFalse(result.timedOut, result.stderr)
XCTAssertEqual(result.status, 0, result.stderr)
XCTAssertTrue(result.stderr.isEmpty, result.stderr)
let forwardedBridgeInput = bridgeInput.snapshot()
XCTAssertEqual(
String(data: forwardedBridgeInput, encoding: .utf8),
forwardedInput,
"Terminal probe replies queued during reconnect must not be forwarded into the remote PTY."
)
let methods = state.snapshot().compactMap { self.jsonObject($0)?["method"] as? String }
XCTAssertEqual(methods, [
"workspace.remote.pty_bridge",
"workspace.remote.pty_sessions",
"workspace.remote.pty_attach_end",
])
}

func testSSHPTYAttachSerializesResizeBeforeEOFLocalCleanup() throws {
let cliPath = try bundledCLIPath()
let socketPath = makeSocketPath("sshptyresize")
Expand Down
Loading
Loading