From f2651bcbe10af13a38c96d11cd8d02efa8892947 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 00:05:34 +0530 Subject: [PATCH 01/13] 1 --- .../Sources/IMessage/DatabaseTickWaits.swift | 125 ++++++++++++++ .../Sources/IMessage/PlatformAPI.swift | 109 +++++------- src/IMessage/Sources/IMessageCore/Topic.swift | 35 ++-- .../IMessageTests/DatabaseTickWaitTests.swift | 160 ++++++++++++++++++ 4 files changed, 354 insertions(+), 75 deletions(-) create mode 100644 src/IMessage/Sources/IMessage/DatabaseTickWaits.swift create mode 100644 src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift new file mode 100644 index 00000000..c1f626d9 --- /dev/null +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -0,0 +1,125 @@ +import Foundation +import IMDatabase +import IMessageCore +import PlatformSDK + +private let sentMessageLinkWaitTimeout: TimeInterval = 1.5 + +enum DatabaseTickWaits { + typealias SentMessageID = (rowID: Int, guid: String) + + static func sentMessageIDs( + text: String?, + timeout: TimeInterval, + changes: Topic, + linkTimeout: TimeInterval = sentMessageLinkWaitTimeout, + querySentMessageIDs: @escaping @Sendable () throws -> [SentMessageID] + ) async throws -> [SentMessageID] { + let startedAt = Date() + let timeoutDeadline = startedAt.addingTimeInterval(timeout) + let linkDeadline = startedAt.addingTimeInterval(linkTimeout) + let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1 + + while true { + let changeStream = changes.subscribe() + let sentMessageIDs = try querySentMessageIDs() + if sentMessageIDs.count == expectedNewMessageIDCount { + return sentMessageIDs + } + if text != nil, !sentMessageIDs.isEmpty, Date() >= linkDeadline { + return sentMessageIDs + } + if Date() >= timeoutDeadline { + throw ErrorMessage("timed out waiting for sent messages") + } + + let wakeDeadline: Date + if text != nil, !sentMessageIDs.isEmpty { + wakeDeadline = min(timeoutDeadline, linkDeadline) + } else { + wakeDeadline = timeoutDeadline + } + _ = try await waitForChange(on: changeStream, until: wakeDeadline) + } + } + + static func sentThreadIDs( + timeout: TimeInterval, + changes: Topic, + querySentThreadIDs: @escaping @Sendable () throws -> [String?] + ) async throws -> [String?] { + let deadline = Date().addingTimeInterval(timeout) + + while true { + let changeStream = changes.subscribe() + let threadIDs = try querySentThreadIDs() + if !threadIDs.contains(where: { $0 == nil }) || Date() >= deadline { + return threadIDs + } + + _ = try await waitForChange(on: changeStream, until: deadline) + } + } + + static func loadedAttachment( + messageID: String, + timeout: TimeInterval, + changes: Topic, + loadMessage: @escaping @Sendable () async throws -> PlatformSDK.Message?, + terminalAttachmentFailureState: @escaping @Sendable () async throws -> Attachment.IMFileTransferState? + ) async throws -> PlatformSDK.Message { + let deadline = Date().addingTimeInterval(timeout) + var isFirstRead = true + + while true { + let changeStream = changes.subscribe() + let message = try await loadMessage() + .orThrow(ErrorMessage("Could not find message \(messageID)")) + let attachments = message.attachments ?? [] + if isFirstRead { + guard !attachments.isEmpty else { + throw ErrorMessage("Message \(messageID) has no attachments") + } + isFirstRead = false + } + if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { + return message + } + + if let failureState = try await terminalAttachmentFailureState() { + throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") + } + + guard Date() < deadline else { + throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + } + + _ = try await waitForChange(on: changeStream, until: deadline) + } + } + + private static func waitForChange(on stream: AsyncStream, until deadline: Date) async throws -> Bool { + let remainingTime = deadline.timeIntervalSinceNow + guard remainingTime > 0 else { return false } + + return try await withThrowingTaskGroup(of: Bool.self) { group in + group.addTask { + var iterator = stream.makeAsyncIterator() + return await iterator.next() != nil + } + group.addTask { + try await Task.sleep(forTimeInterval: remainingTime) + return false + } + + do { + let changed = try await group.next() ?? false + group.cancelAll() + return changed + } catch { + group.cancelAll() + throw error + } + } + } +} diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index e69b615b..6d69e07b 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -8,27 +8,36 @@ private let messagePageLimit = 20 private let platformLog = Logger(imessageLabel: "platform-api") private let messageSendTimeout: TimeInterval = 45 private let reactionSendTimeout: TimeInterval = 5 -private let waitForLinksTimeout: TimeInterval = 1.5 private let waitForSentThreadTimeout: TimeInterval = 10 -private let sentMessagePollInterval: TimeInterval = 0.025 private let loadAttachmentTimeout: TimeInterval = 60 -private let loadAttachmentInitialPollInterval: TimeInterval = 0.25 -private let loadAttachmentMaxPollInterval: TimeInterval = 5 private final class PlatformAPIDatabase: @unchecked Sendable { - private let database = Protected() + private let state = Protected(State()) func withDatabase(_ action: (IMDatabase) throws -> T) throws -> T { - try database.withLock { cachedDatabase in - if let cachedDatabase { - return try action(cachedDatabase) - } + try state.withLock { state in + let db = try state.database ?? IMDatabase(createIndexes: true) + state.database = db + return try action(db) + } + } - let newDatabase = try IMDatabase(createIndexes: true) - cachedDatabase = newDatabase - return try action(newDatabase) + func changeTopic() throws -> Topic { + try state.withLock { state in + let db = try state.database ?? IMDatabase(createIndexes: true) + state.database = db + if !state.isListeningForChanges { + try db.beginListeningForChanges() + state.isListeningForChanges = true + } + return db.changes } } + + private struct State { + var database: IMDatabase? + var isListeningForChanges = false + } } /// Process-wide PlatformAPI facade. @@ -410,36 +419,18 @@ public final class PlatformAPI { messageID: String, timeout: TimeInterval ) async throws -> PlatformSDK.Message { - let deadline = Date().addingTimeInterval(timeout) - var pollInterval = loadAttachmentInitialPollInterval - var isFirstRead = true - - while true { - let message = try await getMessage(threadID: threadID, messageID: messageID) - .orThrow(ErrorMessage("Could not find message \(messageID)")) - let attachments = message.attachments ?? [] - if isFirstRead { - guard !attachments.isEmpty else { - throw ErrorMessage("Message \(messageID) has no attachments") - } - isFirstRead = false - } - if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { - return message - } - - if let failureState = try await terminalAttachmentFailureState(messageID: messageID) { - throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") - } - - let remainingTime = deadline.timeIntervalSinceNow - guard remainingTime > 0 else { - throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + let changes = try database.changeTopic() + return try await DatabaseTickWaits.loadedAttachment( + messageID: messageID, + timeout: timeout, + changes: changes, + loadMessage: { + try await self.getMessage(threadID: threadID, messageID: messageID) + }, + terminalAttachmentFailureState: { + try await self.terminalAttachmentFailureState(messageID: messageID) } - - try await Task.sleep(forTimeInterval: min(pollInterval, remainingTime)) - pollInterval = min(pollInterval * 2, loadAttachmentMaxPollInterval) - } + ) } /// Returns the first attachment state on `messageID` that is terminal failure @@ -764,46 +755,34 @@ public final class PlatformAPI { timeout: TimeInterval ) async throws -> [(rowID: Int, guid: String)] { let database = database + let changes = try database.changeTopic() return try await Task.detached(priority: .userInitiated) { - let start = Date() - let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1 - var sentMessageIDs: [(rowID: Int, guid: String)] = [] - while sentMessageIDs.count != expectedNewMessageIDCount { - sentMessageIDs = try database.withDatabase { db in + try await DatabaseTickWaits.sentMessageIDs( + text: text, + timeout: timeout, + changes: changes + ) { + try database.withDatabase { db in try db.sentMessageIDs(since: lastRowID) } - if text != nil, !sentMessageIDs.isEmpty, Date().timeIntervalSince(start) > waitForLinksTimeout { - break - } - if Date().timeIntervalSince(start) > timeout { - throw ErrorMessage("timed out waiting for sent messages") - } - try await Task.sleep(forTimeInterval: sentMessagePollInterval) } - return sentMessageIDs }.value } private func waitForSentThreadIDs(messageRowIDs: [Int]) async throws -> [String?] { let database = database + let changes = try database.changeTopic() return try await Task.detached(priority: .userInitiated) { - let sentThreadIDs = { + try await DatabaseTickWaits.sentThreadIDs( + timeout: waitForSentThreadTimeout, + changes: changes + ) { try messageRowIDs.map { rowID in try database.withDatabase { db in try db.threadIDForMessage(rowID: rowID) } } } - var threadIDs = try sentThreadIDs() - let start = Date() - while threadIDs.contains(where: { $0 == nil }) { - try await Task.sleep(forTimeInterval: sentMessagePollInterval) - threadIDs = try sentThreadIDs() - if Date().timeIntervalSince(start) > waitForSentThreadTimeout { - break - } - } - return threadIDs }.value } diff --git a/src/IMessage/Sources/IMessageCore/Topic.swift b/src/IMessage/Sources/IMessageCore/Topic.swift index 33959af2..3cea527c 100644 --- a/src/IMessage/Sources/IMessageCore/Topic.swift +++ b/src/IMessage/Sources/IMessageCore/Topic.swift @@ -1,8 +1,10 @@ +import Foundation + public final class Topic { public typealias BufferingPolicy = AsyncStream.Continuation.BufferingPolicy private let bufferingPolicy: BufferingPolicy - private var subscriptions = Protected<[AsyncStream.Continuation]>([]) + private var subscriptions = Protected<[(id: UUID, continuation: AsyncStream.Continuation)]>([]) public init(bufferingPolicy: BufferingPolicy = .unbounded) { self.bufferingPolicy = bufferingPolicy @@ -13,33 +15,46 @@ extension Topic: @unchecked Sendable {} public extension Topic { func broadcast(_ value: sending T) { - subscriptions.withLock { - for subscription in $0 { - subscription.yield(value) - } + let currentSubscriptions = subscriptions.withLock { + $0.map(\.continuation) + } + for subscription in currentSubscriptions { + subscription.yield(value) } } func subscribe() -> AsyncStream { + let id = UUID() let (stream, cont) = AsyncStream.makeStream(of: T.self, bufferingPolicy: bufferingPolicy) + cont.onTermination = { [weak self] _ in + self?.subscriptions.withLock { + $0.removeAll { $0.id == id } + } + } subscriptions.withLock { - $0.append(cont) + $0.append((id, cont)) } return stream } + package var subscriptionCount: Int { + subscriptions.withLock { $0.count } + } + /** * Finishes all current subscribers and empties the subscriptions list. * * New subscribers may still be registered after calling this method. */ func finishCurrentSubscribers() { - subscriptions.withLock { - for subscription in $0 { - subscription.finish() - } + let currentSubscriptions = subscriptions.withLock { + let current = $0.map(\.continuation) $0.removeAll() + return current + } + for subscription in currentSubscriptions { + subscription.finish() } } } diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift new file mode 100644 index 00000000..3f4ad97f --- /dev/null +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -0,0 +1,160 @@ +import Foundation +@testable import IMessage +import IMessageCore +import PlatformSDK +import Testing + +@Test func topicRemovesTerminatedSubscriptions() async throws { + let topic = Topic() + let task = Task { + for await _ in topic.subscribe() {} + } + + #expect(await eventually { topic.subscriptionCount == 1 }) + task.cancel() + await task.value + #expect(await eventually { topic.subscriptionCount == 0 }) +} + +@Test func sentMessageIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let sentRows = Protected<[DatabaseTickWaits.SentMessageID]>([]) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 1, + changes: changes + ) { + queryCount.withLock { $0 += 1 } + return sentRows.read() + } + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + sentRows.withLock { + $0 = [(rowID: 11, guid: "message-11")] + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func sentMessageIDWaitReturnsPartialLinkSendAfterLinkTimeout() async throws { + let changes = Topic() + let queryCount = Protected(0) + let startedAt = Date() + + let result = try await DatabaseTickWaits.sentMessageIDs( + text: "https://one.example https://two.example", + timeout: 1, + changes: changes, + linkTimeout: 0.05 + ) { + queryCount.withLock { $0 += 1 } + return [(rowID: 11, guid: "message-11")] + } + + #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() == 2) + #expect(Date().timeIntervalSince(startedAt) < 0.5) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func sentThreadIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let threadIDs = Protected<[String?]>([nil]) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.sentThreadIDs( + timeout: 1, + changes: changes + ) { + queryCount.withLock { $0 += 1 } + return threadIDs.read() + } + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + threadIDs.withLock { + $0 = ["thread-1"] + } + changes.broadcast(()) + + let result = try await task.value + #expect(result == ["thread-1"]) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let currentMessage = Protected(messageWithAttachmentLoading(true)) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { + queryCount.withLock { $0 += 1 } + return currentMessage.read() + }, + terminalAttachmentFailureState: { + nil + } + ) + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + currentMessage.withLock { + $0 = messageWithAttachmentLoading(false) + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.attachments?.first?.loading == false) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +private func messageWithAttachmentLoading(_ loading: Bool) -> PlatformSDK.Message { + PlatformSDK.Message( + id: "message-1", + timestamp: 1, + senderID: "sender-1", + attachments: [ + PlatformSDK.Attachment( + id: "attachment-1", + type: .img, + loading: loading + ), + ] + ) +} + +private func eventually(timeout: TimeInterval = 1, _ predicate: () -> Bool) async -> Bool { + let deadline = Date().addingTimeInterval(timeout) + while Date() < deadline { + if predicate() { + return true + } + try? await Task.sleep(forTimeInterval: 0.01) + } + return predicate() +} From fa29fdfb0d08b0b2f71efe5c5694a098c7ec592d Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 01:23:56 +0530 Subject: [PATCH 02/13] harden database-tick waits: backstop, graceful watcher fallback, cancellation, tests - waitForChange caps each wait at a ~1s backstop so a dropped/coalesced FS tick re-queries at poll cadence instead of hanging to the full timeout - changeTopic() degrades gracefully when beginListeningForChanges() throws: log and return the (silent) topic; the backstop carries the wait - sentMessageIDs/sentThreadIDs waits run inline instead of Task.detached so caller cancellation propagates and they stop promptly - extract ensureDatabase(_:) to dedupe lazy DB init - add regression tests for the timeout/error paths in all three helpers and a finishCurrentSubscribers deadlock guard Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Sources/IMessage/DatabaseTickWaits.swift | 8 +- .../Sources/IMessage/PlatformAPI.swift | 64 +++++++------ .../IMessageTests/DatabaseTickWaitTests.swift | 92 +++++++++++++++++++ 3 files changed, 136 insertions(+), 28 deletions(-) diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift index c1f626d9..e79c45fe 100644 --- a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -5,6 +5,10 @@ import PlatformSDK private let sentMessageLinkWaitTimeout: TimeInterval = 1.5 +// Re-query at least this often even without a tick: FSEvents notifications can be +// dropped or coalesced, so a missed tick costs ~1s instead of the full timeout. +private let databaseTickBackstopInterval: TimeInterval = 1.0 + enum DatabaseTickWaits { typealias SentMessageID = (rowID: Int, guid: String) @@ -102,13 +106,15 @@ enum DatabaseTickWaits { let remainingTime = deadline.timeIntervalSinceNow guard remainingTime > 0 else { return false } + let sleepTime = min(remainingTime, databaseTickBackstopInterval) + return try await withThrowingTaskGroup(of: Bool.self) { group in group.addTask { var iterator = stream.makeAsyncIterator() return await iterator.next() != nil } group.addTask { - try await Task.sleep(forTimeInterval: remainingTime) + try await Task.sleep(forTimeInterval: sleepTime) return false } diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index 6d69e07b..a2e1747a 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -16,24 +16,35 @@ private final class PlatformAPIDatabase: @unchecked Sendable { func withDatabase(_ action: (IMDatabase) throws -> T) throws -> T { try state.withLock { state in - let db = try state.database ?? IMDatabase(createIndexes: true) - state.database = db - return try action(db) + try action(try ensureDatabase(&state)) } } func changeTopic() throws -> Topic { try state.withLock { state in - let db = try state.database ?? IMDatabase(createIndexes: true) - state.database = db + let db = try ensureDatabase(&state) if !state.isListeningForChanges { - try db.beginListeningForChanges() - state.isListeningForChanges = true + do { + try db.beginListeningForChanges() + state.isListeningForChanges = true + } catch { + // Don't fail the wait: the topic just won't tick, and the + // backstop in DatabaseTickWaits carries it at poll cadence. + // Flag stays unset so the next call retries setup. + platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)") + } } return db.changes } } + /// Lazily opens the process-wide database, caching it on `state`. + private func ensureDatabase(_ state: inout State) throws -> IMDatabase { + let db = try state.database ?? IMDatabase(createIndexes: true) + state.database = db + return db + } + private struct State { var database: IMDatabase? var isListeningForChanges = false @@ -756,34 +767,33 @@ public final class PlatformAPI { ) async throws -> [(rowID: Int, guid: String)] { let database = database let changes = try database.changeTopic() - return try await Task.detached(priority: .userInitiated) { - try await DatabaseTickWaits.sentMessageIDs( - text: text, - timeout: timeout, - changes: changes - ) { - try database.withDatabase { db in - try db.sentMessageIDs(since: lastRowID) - } + // Not via Task.detached, so caller cancellation propagates and the wait + // stops promptly instead of running to its own timeout. + return try await DatabaseTickWaits.sentMessageIDs( + text: text, + timeout: timeout, + changes: changes + ) { + try database.withDatabase { db in + try db.sentMessageIDs(since: lastRowID) } - }.value + } } private func waitForSentThreadIDs(messageRowIDs: [Int]) async throws -> [String?] { let database = database let changes = try database.changeTopic() - return try await Task.detached(priority: .userInitiated) { - try await DatabaseTickWaits.sentThreadIDs( - timeout: waitForSentThreadTimeout, - changes: changes - ) { - try messageRowIDs.map { rowID in - try database.withDatabase { db in - try db.threadIDForMessage(rowID: rowID) - } + // Not via Task.detached, so caller cancellation propagates. + return try await DatabaseTickWaits.sentThreadIDs( + timeout: waitForSentThreadTimeout, + changes: changes + ) { + try messageRowIDs.map { rowID in + try database.withDatabase { db in + try db.threadIDForMessage(rowID: rowID) } } - }.value + } } private func sentMessages(_ sentMessageIDs: [(rowID: Int, guid: String)]) async throws -> [PlatformSDK.Message] { diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift index 3f4ad97f..021daac5 100644 --- a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -1,4 +1,5 @@ import Foundation +import IMDatabase @testable import IMessage import IMessageCore import PlatformSDK @@ -133,6 +134,97 @@ import Testing #expect(await eventually { changes.subscriptionCount == 0 }) } +@Test func sentMessageIDWaitThrowsOnTimeoutWithoutTick() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 0.2, + changes: changes + ) { + [] + } + } + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func sentThreadIDWaitReturnsPartialAfterDeadline() async throws { + let changes = Topic() + let result = try await DatabaseTickWaits.sentThreadIDs( + timeout: 0.2, + changes: changes + ) { + [nil, "thread-2"] + } + + #expect(result == [nil, "thread-2"]) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsWhenMessageHasNoAttachments() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { messageWithNoAttachments() }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsOnTerminalFailureState() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { messageWithAttachmentLoading(true) }, + terminalAttachmentFailureState: { Attachment.IMFileTransferState.error } + ) + } + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsOnTimeoutWithoutTick() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 0.2, + changes: changes, + loadMessage: { messageWithAttachmentLoading(true) }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +// Guards the lock-release-before-finish() fix: calling finish() under the lock +// would re-enter onTermination on the non-reentrant os_unfair_lock and deadlock. +@Test func finishCurrentSubscribersDoesNotDeadlockAndClearsSubscriptions() { + let topic = Topic() + let streams = (0 ..< 5).map { _ in topic.subscribe() } + #expect(topic.subscriptionCount == 5) + + topic.finishCurrentSubscribers() + + #expect(topic.subscriptionCount == 0) + withExtendedLifetime(streams) {} +} + +private func messageWithNoAttachments() -> PlatformSDK.Message { + PlatformSDK.Message( + id: "message-1", + timestamp: 1, + senderID: "sender-1", + attachments: [] + ) +} + private func messageWithAttachmentLoading(_ loading: Bool) -> PlatformSDK.Message { PlatformSDK.Message( id: "message-1", From 3fbb0e44134590218355c4bd4110191a223d4a55 Mon Sep 17 00:00:00 2001 From: "indent[bot]" <216979840+indent[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 20:05:26 +0000 Subject: [PATCH 03/13] ensureDatabase: skip redundant cache write on hits Return early when state.database is already set so cache hits don't re-store the same reference under the lock. Generated with [Indent](https://indent.com) Co-Authored-By: KishanBagaria --- src/IMessage/Sources/IMessage/PlatformAPI.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index a2e1747a..581535bd 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -40,7 +40,10 @@ private final class PlatformAPIDatabase: @unchecked Sendable { /// Lazily opens the process-wide database, caching it on `state`. private func ensureDatabase(_ state: inout State) throws -> IMDatabase { - let db = try state.database ?? IMDatabase(createIndexes: true) + if let cached = state.database { + return cached + } + let db = try IMDatabase(createIndexes: true) state.database = db return db } From 737c1fa3ed5c2150dfd1dbab638aa7f56d65c3ca Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:02:50 +0530 Subject: [PATCH 04/13] address code review: backstop/cancellation tests, idempotent watcher setup, off-lock changeTopic - test the 1.0s backstop re-query path with no FS tick (injectable backstop interval) - test caller-cancellation propagation through DatabaseTickWaits (prompt throw + subscription cleanup) - test loadedAttachment's message-not-found throw path - changeTopic(): run beginListeningForChanges() outside the DB lock; re-acquire only to record state - replace isListeningForChanges bool with ListeningState (notStarted/listening/failed): log once and accept backstop-degraded mode instead of retrying setup on every send - make beginListeningForChanges() idempotent: cancel the prior debouncer Task and tear down partial watcher state on throw Co-Authored-By: Claude Opus 4.7 (1M context) --- .../IMDatabase/Database/IMDatabase.swift | 34 +++++++++ .../Sources/IMessage/DatabaseTickWaits.swift | 13 ++-- .../Sources/IMessage/PlatformAPI.swift | 49 +++++++++---- .../IMessageTests/DatabaseTickWaitTests.swift | 70 +++++++++++++++++++ 4 files changed, 148 insertions(+), 18 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index 94ee854e..53ed56a4 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -96,8 +96,41 @@ public extension IMDatabase { func beginListeningForChanges() throws { log.info("setting up filesystem watchers") + // Idempotent: a prior (possibly partial) setup may have left a debouncer + // Task and file watchers behind. Tear them down first so a retry doesn't + // leak the old debouncer Task or double-register watchers. + debouncer?.cancel() + debouncer = nil + for watcher in fileWatchers { + watcher.stopListeningIfNecessary() + } + fileWatchers.removeAll() + let unthrottledChanges = Topic() + // If setup throws partway, clean up whatever we managed to register so a + // later retry starts from a clean slate instead of half-wired watchers. + var directoryWatcher: FSEventsWatcher? + func cleanupPartialSetup() { + if let directoryWatcher { + directoryWatcher.stop() + directoryWatcher.invalidate() + } + for watcher in fileWatchers { + watcher.stopListeningIfNecessary() + } + fileWatchers.removeAll() + } + + do { + try setUpListeners(unthrottledChanges: unthrottledChanges, directoryWatcher: &directoryWatcher) + } catch { + cleanupPartialSetup() + throw error + } + } + + private func setUpListeners(unthrottledChanges: Topic, directoryWatcher directoryWatcherOut: inout FSEventsWatcher?) throws { // listen to ~/Library/Messages itself in order to respond to the WAL // file being (re)created/deleted let directoryWatcher = try FSEventsWatcher(watchingPath: messagesDataDirectory.path, latency: 1.0) { [weak self] _, event in @@ -125,6 +158,7 @@ public extension IMDatabase { } } directoryWatcher.setDispatchQueue(fsEventsQueue) + directoryWatcherOut = directoryWatcher try directoryWatcher.start() try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift index e79c45fe..f1771967 100644 --- a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -17,6 +17,7 @@ enum DatabaseTickWaits { timeout: TimeInterval, changes: Topic, linkTimeout: TimeInterval = sentMessageLinkWaitTimeout, + backstopInterval: TimeInterval = databaseTickBackstopInterval, querySentMessageIDs: @escaping @Sendable () throws -> [SentMessageID] ) async throws -> [SentMessageID] { let startedAt = Date() @@ -43,13 +44,14 @@ enum DatabaseTickWaits { } else { wakeDeadline = timeoutDeadline } - _ = try await waitForChange(on: changeStream, until: wakeDeadline) + _ = try await waitForChange(on: changeStream, until: wakeDeadline, backstopInterval: backstopInterval) } } static func sentThreadIDs( timeout: TimeInterval, changes: Topic, + backstopInterval: TimeInterval = databaseTickBackstopInterval, querySentThreadIDs: @escaping @Sendable () throws -> [String?] ) async throws -> [String?] { let deadline = Date().addingTimeInterval(timeout) @@ -61,7 +63,7 @@ enum DatabaseTickWaits { return threadIDs } - _ = try await waitForChange(on: changeStream, until: deadline) + _ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) } } @@ -69,6 +71,7 @@ enum DatabaseTickWaits { messageID: String, timeout: TimeInterval, changes: Topic, + backstopInterval: TimeInterval = databaseTickBackstopInterval, loadMessage: @escaping @Sendable () async throws -> PlatformSDK.Message?, terminalAttachmentFailureState: @escaping @Sendable () async throws -> Attachment.IMFileTransferState? ) async throws -> PlatformSDK.Message { @@ -98,15 +101,15 @@ enum DatabaseTickWaits { throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") } - _ = try await waitForChange(on: changeStream, until: deadline) + _ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) } } - private static func waitForChange(on stream: AsyncStream, until deadline: Date) async throws -> Bool { + private static func waitForChange(on stream: AsyncStream, until deadline: Date, backstopInterval: TimeInterval) async throws -> Bool { let remainingTime = deadline.timeIntervalSinceNow guard remainingTime > 0 else { return false } - let sleepTime = min(remainingTime, databaseTickBackstopInterval) + let sleepTime = min(remainingTime, backstopInterval) return try await withThrowingTaskGroup(of: Bool.self) { group in group.addTask { diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index 581535bd..c884e1eb 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -21,21 +21,36 @@ private final class PlatformAPIDatabase: @unchecked Sendable { } func changeTopic() throws -> Topic { - try state.withLock { state in + // Snapshot the db reference (and whether setup is still pending) under + // the lock, then run the external setup call OUTSIDE the lock: + // beginListeningForChanges opens FDs, creates an FSEventStream, and + // spawns a Task, none of which should run while we hold the lock. + let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in let db = try ensureDatabase(&state) - if !state.isListeningForChanges { - do { - try db.beginListeningForChanges() - state.isListeningForChanges = true - } catch { - // Don't fail the wait: the topic just won't tick, and the - // backstop in DatabaseTickWaits carries it at poll cadence. - // Flag stays unset so the next call retries setup. - platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)") - } - } + return (db, state.listening == .notStarted) + } + + guard shouldSetUp else { return db.changes } + + // Two callers can both observe `.notStarted` in a rare window and each + // run setup once. beginListeningForChanges is idempotent (it tears down + // any prior watchers/debouncer first), so at most one extra setup in + // that window is harmless; steady state never double-spawns watchers. + do { + try db.beginListeningForChanges() + state.withLock { $0.listening = .listening } + } catch { + // Policy: log once and accept backstop-degraded mode. The topic just + // won't tick, but the 1.0s backstop in DatabaseTickWaits carries + // correctness at poll cadence. We deliberately do NOT retry setup on + // every subsequent call (no backoff) to avoid log/FD churn on a + // persistent failure. + state.withLock { $0.listening = .failed } + platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)") + } + return db.changes } /// Lazily opens the process-wide database, caching it on `state`. @@ -48,9 +63,17 @@ private final class PlatformAPIDatabase: @unchecked Sendable { return db } + /// Tracks change-listener setup so we set it up once and don't re-run it on + /// every wait after a persistent failure. + private enum ListeningState { + case notStarted + case listening + case failed + } + private struct State { var database: IMDatabase? - var isListeningForChanges = false + var listening: ListeningState = .notStarted } } diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift index 021daac5..6d8b94f0 100644 --- a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -48,6 +48,62 @@ import Testing #expect(await eventually { changes.subscriptionCount == 0 }) } +// Proves the backstop re-queries WITHOUT any broadcast(()): the result flips +// after the (short, injected) backstop interval and is picked up by the poll, +// well before the much-longer timeout. +@Test func sentMessageIDWaitReQueriesOnBackstopWithoutTick() async throws { + let changes = Topic() + let sentRows = Protected<[DatabaseTickWaits.SentMessageID]>([]) + let startedAt = Date() + + Task { + try? await Task.sleep(forTimeInterval: 0.15) + sentRows.withLock { + $0 = [(rowID: 11, guid: "message-11")] + } + } + + let result = try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 2.5, + changes: changes, + backstopInterval: 0.1 + ) { + sentRows.read() + } + + #expect(result.map(\.rowID) == [11]) + // Never broadcast: the backstop alone must have driven the re-query. + #expect(Date().timeIntervalSince(startedAt) < 1.0) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + +// Proves caller cancellation propagates: a never-satisfied wait with a long +// timeout finishes promptly when its task is cancelled, and unsubscribes. +@Test func sentMessageIDWaitPropagatesCallerCancellation() async throws { + let changes = Topic() + + let task = Task { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 5, + changes: changes + ) { + [] + } + } + + #expect(await eventually { changes.subscriptionCount == 1 }) + task.cancel() + + let startedAt = Date() + await #expect(throws: (any Error).self) { + _ = try await task.value + } + #expect(Date().timeIntervalSince(startedAt) < 1.0) + #expect(await eventually { changes.subscriptionCount == 0 }) +} + @Test func sentMessageIDWaitReturnsPartialLinkSendAfterLinkTimeout() async throws { let changes = Topic() let queryCount = Protected(0) @@ -175,6 +231,20 @@ import Testing #expect(await eventually { changes.subscriptionCount == 0 }) } +@Test func loadedAttachmentWaitThrowsWhenMessageNotFound() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { nil }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.subscriptionCount == 0 }) +} + @Test func loadedAttachmentWaitThrowsOnTerminalFailureState() async throws { let changes = Topic() await #expect(throws: (any Error).self) { From 9343d8c51da241dbf8df8433cba8e637f0d9dc17 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:17:17 +0530 Subject: [PATCH 05/13] fix concurrent change-listener setup race and unstarted-watcher cleanup changeTopic() ran beginListeningForChanges() outside the state lock and let two callers both observe .notStarted, so two threads could concurrently mutate IMDatabase's unsynchronized fileWatchers array and debouncer. Claim the setup slot atomically (.notStarted -> .starting) under the lock so only one caller runs setup; the loser returns the topic and rides the backstop. Also assign directoryWatcherOut only after start() succeeds, so the partial-setup cleanup can't stop/invalidate a never-started FSEventStream. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../IMDatabase/Database/IMDatabase.swift | 5 ++++- .../Sources/IMessage/PlatformAPI.swift | 22 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index 53ed56a4..9ebb62e6 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -158,8 +158,11 @@ public extension IMDatabase { } } directoryWatcher.setDispatchQueue(fsEventsQueue) - directoryWatcherOut = directoryWatcher try directoryWatcher.start() + // Only hand the watcher to the caller's cleanup path once it has actually + // started; stopping/invalidating a never-started FSEventStream trips a + // CoreServices state assertion. + directoryWatcherOut = directoryWatcher try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index c884e1eb..81adabcc 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -21,23 +21,30 @@ private final class PlatformAPIDatabase: @unchecked Sendable { } func changeTopic() throws -> Topic { - // Snapshot the db reference (and whether setup is still pending) under - // the lock, then run the external setup call OUTSIDE the lock: + // Snapshot the db reference under the lock and atomically claim the setup + // slot, then run the external setup call OUTSIDE the lock: // beginListeningForChanges opens FDs, creates an FSEventStream, and // spawns a Task, none of which should run while we hold the lock. + // + // The claim must be atomic: IMDatabase's watcher/debouncer state is NOT + // thread-safe, so two callers must never run beginListeningForChanges + // concurrently. We transition `.notStarted` -> `.starting` under the lock; + // only the winner runs setup. The loser returns the (already-created) + // topic immediately and relies on the 1.0s backstop in DatabaseTickWaits + // for correctness until the winner finishes wiring up the watchers. let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in let db = try ensureDatabase(&state) - return (db, state.listening == .notStarted) + guard state.listening == .notStarted else { + return (db, false) + } + state.listening = .starting + return (db, true) } guard shouldSetUp else { return db.changes } - // Two callers can both observe `.notStarted` in a rare window and each - // run setup once. beginListeningForChanges is idempotent (it tears down - // any prior watchers/debouncer first), so at most one extra setup in - // that window is harmless; steady state never double-spawns watchers. do { try db.beginListeningForChanges() state.withLock { $0.listening = .listening } @@ -67,6 +74,7 @@ private final class PlatformAPIDatabase: @unchecked Sendable { /// every wait after a persistent failure. private enum ListeningState { case notStarted + case starting case listening case failed } From a3758980162a3340403e34f0e0ace8fb90f36984 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:22:14 +0530 Subject: [PATCH 06/13] fix database listener teardown Retain the directory watcher so repeated listener setup can stop it alongside file watchers and the debouncer. Serialize listener setup and filesystem callback refreshes to avoid racing watcher state. Co-Authored-By: Codex --- .../IMDatabase/Database/IMDatabase.swift | 68 +++++++++---------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index 9ebb62e6..5ad2d309 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -36,6 +36,8 @@ public final class IMDatabase { // any time private var fileWatchers = [FileWatcher]() + private let listenerLock = Protected(()) + private var directoryWatcher: FSEventsWatcher? private var debouncer: Task? public var noisy = false @@ -72,10 +74,9 @@ public final class IMDatabase { deinit { log.debug("being deallocated, stopping watchers and listeners if necessary") - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() + listenerLock.withLock { _ in + stopListeningForChangesLocked() } - debouncer?.cancel() } } @@ -94,43 +95,39 @@ private extension IMDatabase { public extension IMDatabase { func beginListeningForChanges() throws { - log.info("setting up filesystem watchers") + try listenerLock.withLock { _ in + log.info("setting up filesystem watchers") - // Idempotent: a prior (possibly partial) setup may have left a debouncer - // Task and file watchers behind. Tear them down first so a retry doesn't - // leak the old debouncer Task or double-register watchers. - debouncer?.cancel() - debouncer = nil - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() - } - fileWatchers.removeAll() + stopListeningForChangesLocked() - let unthrottledChanges = Topic() + let unthrottledChanges = Topic() - // If setup throws partway, clean up whatever we managed to register so a - // later retry starts from a clean slate instead of half-wired watchers. - var directoryWatcher: FSEventsWatcher? - func cleanupPartialSetup() { - if let directoryWatcher { - directoryWatcher.stop() - directoryWatcher.invalidate() - } - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() + do { + try setUpListeners(unthrottledChanges: unthrottledChanges) + } catch { + stopListeningForChangesLocked() + throw error } - fileWatchers.removeAll() } + } + + fileprivate func stopListeningForChangesLocked() { + if let directoryWatcher { + directoryWatcher.stop() + directoryWatcher.invalidate() + self.directoryWatcher = nil + } + + debouncer?.cancel() + debouncer = nil - do { - try setUpListeners(unthrottledChanges: unthrottledChanges, directoryWatcher: &directoryWatcher) - } catch { - cleanupPartialSetup() - throw error + for watcher in fileWatchers { + watcher.stopListeningIfNecessary() } + fileWatchers.removeAll() } - private func setUpListeners(unthrottledChanges: Topic, directoryWatcher directoryWatcherOut: inout FSEventsWatcher?) throws { + private func setUpListeners(unthrottledChanges: Topic) throws { // listen to ~/Library/Messages itself in order to respond to the WAL // file being (re)created/deleted let directoryWatcher = try FSEventsWatcher(watchingPath: messagesDataDirectory.path, latency: 1.0) { [weak self] _, event in @@ -152,17 +149,16 @@ public extension IMDatabase { log.debug("received FSEvent [\(event.id)] \(anonymizedPath) \(event.flags)") do { - try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) + try listenerLock.withLock { _ in + try self.ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) + } } catch { log.error("failed to ensure database file watchers in response to WAL file event: \(error)") } } directoryWatcher.setDispatchQueue(fsEventsQueue) try directoryWatcher.start() - // Only hand the watcher to the caller's cleanup path once it has actually - // started; stopping/invalidating a never-started FSEventStream trips a - // CoreServices state assertion. - directoryWatcherOut = directoryWatcher + self.directoryWatcher = directoryWatcher try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) From d04d9dcae624126a252cc338e07fff3b753a77b2 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:27:59 +0530 Subject: [PATCH 07/13] - Co-Authored-By: Codex --- .../Sources/IMessage/DatabaseTickWaits.swift | 141 +++++++++++------- src/IMessage/Sources/IMessageCore/Topic.swift | 10 +- .../IMessageTests/DatabaseTickWaitTests.swift | 13 +- 3 files changed, 101 insertions(+), 63 deletions(-) diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift index f1771967..0605bd87 100644 --- a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -12,6 +12,11 @@ private let databaseTickBackstopInterval: TimeInterval = 1.0 enum DatabaseTickWaits { typealias SentMessageID = (rowID: Int, guid: String) + private enum WaitResult { + case finished(T) + case waitingUntil(Date) + } + static func sentMessageIDs( text: String?, timeout: TimeInterval, @@ -25,27 +30,32 @@ enum DatabaseTickWaits { let linkDeadline = startedAt.addingTimeInterval(linkTimeout) let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1 - while true { - let changeStream = changes.subscribe() - let sentMessageIDs = try querySentMessageIDs() - if sentMessageIDs.count == expectedNewMessageIDCount { - return sentMessageIDs - } - if text != nil, !sentMessageIDs.isEmpty, Date() >= linkDeadline { - return sentMessageIDs - } - if Date() >= timeoutDeadline { - throw ErrorMessage("timed out waiting for sent messages") - } + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + query: { + try querySentMessageIDs() + }, + evaluate: { sentMessageIDs in + if sentMessageIDs.count == expectedNewMessageIDCount { + return .finished(sentMessageIDs) + } + if text != nil, !sentMessageIDs.isEmpty, Date() >= linkDeadline { + return .finished(sentMessageIDs) + } + if Date() >= timeoutDeadline { + throw ErrorMessage("timed out waiting for sent messages") + } - let wakeDeadline: Date - if text != nil, !sentMessageIDs.isEmpty { - wakeDeadline = min(timeoutDeadline, linkDeadline) - } else { - wakeDeadline = timeoutDeadline + let wakeDeadline: Date + if text != nil, !sentMessageIDs.isEmpty { + wakeDeadline = min(timeoutDeadline, linkDeadline) + } else { + wakeDeadline = timeoutDeadline + } + return .waitingUntil(wakeDeadline) } - _ = try await waitForChange(on: changeStream, until: wakeDeadline, backstopInterval: backstopInterval) - } + ) } static func sentThreadIDs( @@ -56,15 +66,19 @@ enum DatabaseTickWaits { ) async throws -> [String?] { let deadline = Date().addingTimeInterval(timeout) - while true { - let changeStream = changes.subscribe() - let threadIDs = try querySentThreadIDs() - if !threadIDs.contains(where: { $0 == nil }) || Date() >= deadline { - return threadIDs + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + query: { + try querySentThreadIDs() + }, + evaluate: { threadIDs in + if !threadIDs.contains(where: { $0 == nil }) || Date() >= deadline { + return .finished(threadIDs) + } + return .waitingUntil(deadline) } - - _ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) - } + ) } static func loadedAttachment( @@ -78,53 +92,74 @@ enum DatabaseTickWaits { let deadline = Date().addingTimeInterval(timeout) var isFirstRead = true - while true { - let changeStream = changes.subscribe() - let message = try await loadMessage() - .orThrow(ErrorMessage("Could not find message \(messageID)")) - let attachments = message.attachments ?? [] - if isFirstRead { - guard !attachments.isEmpty else { - throw ErrorMessage("Message \(messageID) has no attachments") + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + query: { + try await loadMessage() + .orThrow(ErrorMessage("Could not find message \(messageID)")) + }, + evaluate: { message in + let attachments = message.attachments ?? [] + if isFirstRead { + guard !attachments.isEmpty else { + throw ErrorMessage("Message \(messageID) has no attachments") + } + isFirstRead = false + } + if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { + return .finished(message) } - isFirstRead = false - } - if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { - return message - } - if let failureState = try await terminalAttachmentFailureState() { - throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") - } + if let failureState = try await terminalAttachmentFailureState() { + throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") + } - guard Date() < deadline else { - throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + guard Date() < deadline else { + throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + } + + return .waitingUntil(deadline) } + ) + } - _ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) + private static func waitForDatabaseResult( + changes: Topic, + backstopInterval: TimeInterval, + query: @escaping @Sendable () async throws -> T, + evaluate: (T) async throws -> WaitResult + ) async throws -> T { + while true { + let changeStream = changes.subscribe() + let result = try await query() + switch try await evaluate(result) { + case let .finished(value): + return value + case let .waitingUntil(deadline): + try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) + } } } - private static func waitForChange(on stream: AsyncStream, until deadline: Date, backstopInterval: TimeInterval) async throws -> Bool { + private static func waitForChange(on stream: AsyncStream, until deadline: Date, backstopInterval: TimeInterval) async throws { let remainingTime = deadline.timeIntervalSinceNow - guard remainingTime > 0 else { return false } + guard remainingTime > 0 else { return } let sleepTime = min(remainingTime, backstopInterval) - return try await withThrowingTaskGroup(of: Bool.self) { group in + try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { var iterator = stream.makeAsyncIterator() - return await iterator.next() != nil + _ = await iterator.next() } group.addTask { try await Task.sleep(forTimeInterval: sleepTime) - return false } do { - let changed = try await group.next() ?? false + _ = try await group.next() group.cancelAll() - return changed } catch { group.cancelAll() throw error diff --git a/src/IMessage/Sources/IMessageCore/Topic.swift b/src/IMessage/Sources/IMessageCore/Topic.swift index 3cea527c..0635b58a 100644 --- a/src/IMessage/Sources/IMessageCore/Topic.swift +++ b/src/IMessage/Sources/IMessageCore/Topic.swift @@ -4,7 +4,7 @@ public final class Topic { public typealias BufferingPolicy = AsyncStream.Continuation.BufferingPolicy private let bufferingPolicy: BufferingPolicy - private var subscriptions = Protected<[(id: UUID, continuation: AsyncStream.Continuation)]>([]) + private var subscriptions = Protected<[UUID: AsyncStream.Continuation]>([:]) public init(bufferingPolicy: BufferingPolicy = .unbounded) { self.bufferingPolicy = bufferingPolicy @@ -16,7 +16,7 @@ extension Topic: @unchecked Sendable {} public extension Topic { func broadcast(_ value: sending T) { let currentSubscriptions = subscriptions.withLock { - $0.map(\.continuation) + Array($0.values) } for subscription in currentSubscriptions { subscription.yield(value) @@ -28,11 +28,11 @@ public extension Topic { let (stream, cont) = AsyncStream.makeStream(of: T.self, bufferingPolicy: bufferingPolicy) cont.onTermination = { [weak self] _ in self?.subscriptions.withLock { - $0.removeAll { $0.id == id } + $0[id] = nil } } subscriptions.withLock { - $0.append((id, cont)) + $0[id] = cont } return stream @@ -49,7 +49,7 @@ public extension Topic { */ func finishCurrentSubscribers() { let currentSubscriptions = subscriptions.withLock { - let current = $0.map(\.continuation) + let current = Array($0.values) $0.removeAll() return current } diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift index 6d8b94f0..0e4e660f 100644 --- a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -54,6 +54,7 @@ import Testing @Test func sentMessageIDWaitReQueriesOnBackstopWithoutTick() async throws { let changes = Topic() let sentRows = Protected<[DatabaseTickWaits.SentMessageID]>([]) + let queryCount = Protected(0) let startedAt = Date() Task { @@ -65,16 +66,18 @@ import Testing let result = try await DatabaseTickWaits.sentMessageIDs( text: nil, - timeout: 2.5, + timeout: 5, changes: changes, backstopInterval: 0.1 ) { - sentRows.read() + queryCount.withLock { $0 += 1 } + return sentRows.read() } #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() >= 2) // Never broadcast: the backstop alone must have driven the re-query. - #expect(Date().timeIntervalSince(startedAt) < 1.0) + #expect(Date().timeIntervalSince(startedAt) < 4.0) #expect(await eventually { changes.subscriptionCount == 0 }) } @@ -111,7 +114,7 @@ import Testing let result = try await DatabaseTickWaits.sentMessageIDs( text: "https://one.example https://two.example", - timeout: 1, + timeout: 5, changes: changes, linkTimeout: 0.05 ) { @@ -121,7 +124,7 @@ import Testing #expect(result.map(\.rowID) == [11]) #expect(queryCount.read() == 2) - #expect(Date().timeIntervalSince(startedAt) < 0.5) + #expect(Date().timeIntervalSince(startedAt) < 4.0) #expect(await eventually { changes.subscriptionCount == 0 }) } From fb3344cd9b412988f1327274df10b738a9ab1fe5 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:34:43 +0530 Subject: [PATCH 08/13] $compound-engineering:ce-simplify-code Co-Authored-By: Codex --- .../IMDatabase/Database/IMDatabase.swift | 2 +- .../Sources/IMessage/DatabaseTickWaits.swift | 16 ++++----- .../Sources/IMessage/PlatformAPI.swift | 35 +++---------------- src/IMessage/Sources/IMessageCore/Topic.swift | 10 +++--- .../IMessageTests/DatabaseTickWaitTests.swift | 2 +- todos.md | 4 ++- 6 files changed, 22 insertions(+), 47 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index 5ad2d309..d474be3f 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -111,7 +111,7 @@ public extension IMDatabase { } } - fileprivate func stopListeningForChangesLocked() { + private func stopListeningForChangesLocked() { if let directoryWatcher { directoryWatcher.stop() directoryWatcher.invalidate() diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift index 0605bd87..1072b6e4 100644 --- a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -37,13 +37,14 @@ enum DatabaseTickWaits { try querySentMessageIDs() }, evaluate: { sentMessageIDs in + let now = Date() if sentMessageIDs.count == expectedNewMessageIDCount { return .finished(sentMessageIDs) } - if text != nil, !sentMessageIDs.isEmpty, Date() >= linkDeadline { + if text != nil, !sentMessageIDs.isEmpty, now >= linkDeadline { return .finished(sentMessageIDs) } - if Date() >= timeoutDeadline { + if now >= timeoutDeadline { throw ErrorMessage("timed out waiting for sent messages") } @@ -73,7 +74,7 @@ enum DatabaseTickWaits { try querySentThreadIDs() }, evaluate: { threadIDs in - if !threadIDs.contains(where: { $0 == nil }) || Date() >= deadline { + if !threadIDs.contains(nil) || Date() >= deadline { return .finished(threadIDs) } return .waitingUntil(deadline) @@ -157,13 +158,8 @@ enum DatabaseTickWaits { try await Task.sleep(forTimeInterval: sleepTime) } - do { - _ = try await group.next() - group.cancelAll() - } catch { - group.cancelAll() - throw error - } + defer { group.cancelAll() } + _ = try await group.next() } } } diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index 81adabcc..f74cbaad 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -21,23 +21,13 @@ private final class PlatformAPIDatabase: @unchecked Sendable { } func changeTopic() throws -> Topic { - // Snapshot the db reference under the lock and atomically claim the setup - // slot, then run the external setup call OUTSIDE the lock: - // beginListeningForChanges opens FDs, creates an FSEventStream, and - // spawns a Task, none of which should run while we hold the lock. - // - // The claim must be atomic: IMDatabase's watcher/debouncer state is NOT - // thread-safe, so two callers must never run beginListeningForChanges - // concurrently. We transition `.notStarted` -> `.starting` under the lock; - // only the winner runs setup. The loser returns the (already-created) - // topic immediately and relies on the 1.0s backstop in DatabaseTickWaits - // for correctness until the winner finishes wiring up the watchers. + // Claim setup once, then run watcher startup outside the state lock. let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in let db = try ensureDatabase(&state) - guard state.listening == .notStarted else { + guard !state.hasAttemptedChangeListeningSetup else { return (db, false) } - state.listening = .starting + state.hasAttemptedChangeListeningSetup = true return (db, true) } @@ -47,14 +37,8 @@ private final class PlatformAPIDatabase: @unchecked Sendable { do { try db.beginListeningForChanges() - state.withLock { $0.listening = .listening } } catch { - // Policy: log once and accept backstop-degraded mode. The topic just - // won't tick, but the 1.0s backstop in DatabaseTickWaits carries - // correctness at poll cadence. We deliberately do NOT retry setup on - // every subsequent call (no backoff) to avoid log/FD churn on a - // persistent failure. - state.withLock { $0.listening = .failed } + // DatabaseTickWaits' backstop polling preserves correctness if filesystem ticks are unavailable. platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)") } return db.changes @@ -70,18 +54,9 @@ private final class PlatformAPIDatabase: @unchecked Sendable { return db } - /// Tracks change-listener setup so we set it up once and don't re-run it on - /// every wait after a persistent failure. - private enum ListeningState { - case notStarted - case starting - case listening - case failed - } - private struct State { var database: IMDatabase? - var listening: ListeningState = .notStarted + var hasAttemptedChangeListeningSetup = false } } diff --git a/src/IMessage/Sources/IMessageCore/Topic.swift b/src/IMessage/Sources/IMessageCore/Topic.swift index 0635b58a..6e2cc131 100644 --- a/src/IMessage/Sources/IMessageCore/Topic.swift +++ b/src/IMessage/Sources/IMessageCore/Topic.swift @@ -38,10 +38,6 @@ public extension Topic { return stream } - package var subscriptionCount: Int { - subscriptions.withLock { $0.count } - } - /** * Finishes all current subscribers and empties the subscriptions list. * @@ -58,3 +54,9 @@ public extension Topic { } } } + +extension Topic { + var subscriptionCount: Int { + subscriptions.withLock { $0.count } + } +} diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift index 0e4e660f..4f057007 100644 --- a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -1,7 +1,7 @@ import Foundation import IMDatabase @testable import IMessage -import IMessageCore +@testable import IMessageCore import PlatformSDK import Testing diff --git a/todos.md b/todos.md index 40b5c405..31c1a09a 100644 --- a/todos.md +++ b/todos.md @@ -19,10 +19,12 @@ - [ ] add cross-process coordination when more than one process is driving Messages.app at the same time (e.g. Beeper Desktop + a separate CLI process, or two CLIs w secondary instance off) - [ ] separate `UserDefaults` somehow so that CLI and other consumers don't share the prefs - [ ] user manually killing the messages.app causes cli to not detect that ("Domain=NSOSStatusErrorDomain Code=-600 "procNotFound: no eligible process with specified descriptor"") -- [ ] improve misfire prevention and robustness - [ ] when scheduled messages are actually sent, send a message update event - [ ] perhaps move PlatformSDK to +- [ ] improve misfire prevention and robustness +- [ ] DatabaseTickWaits.{sentMessageIDs,sentThreadIDs} shouldn't exist, we get ServerEvents for new messages, use that. [wip](https://github.com/beeper/platform-imessage/tree/purav/fix-imessage-send-upsert) + - cli - [ ] command to watch chat that prints new activity for just that chat, json new-line separated - [ ] use better library for repl? From 47f685041092e37a76d1938ac46d5192a4ae359d Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 02:42:39 +0530 Subject: [PATCH 09/13] Update todos.md --- todos.md | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/todos.md b/todos.md index 31c1a09a..2cd21bdc 100644 --- a/todos.md +++ b/todos.md @@ -4,7 +4,6 @@ - [ ] publish to homebrew - [ ] add example Swift script that consumes the library -- [ ] add example JS script that consumes the library - [ ] improve permissions prompt, use - [ ] replace SQLite w https://github.com/pointfreeco/sqlite-data and benchmark @@ -13,15 +12,17 @@ - [ ] consider folding callback from `PlatformAPI.onThreadSelected` into `subscribeToEvents` - [ ] [bridgev2](https://github.com/mautrix/go) version for self hosting support -- [ ] run with node instead of electron -- [ ] review for races, `PlatformAPI.messagesController` is mutated without isolation - [ ] add cross-process coordination when more than one process is driving Messages.app at the same time (e.g. Beeper Desktop + a separate CLI process, or two CLIs w secondary instance off) - [ ] separate `UserDefaults` somehow so that CLI and other consumers don't share the prefs - [ ] user manually killing the messages.app causes cli to not detect that ("Domain=NSOSStatusErrorDomain Code=-600 "procNotFound: no eligible process with specified descriptor"") - [ ] when scheduled messages are actually sent, send a message update event - [ ] perhaps move PlatformSDK to +- concurrency + - [ ] review for races, `PlatformAPI.messagesController` is mutated without isolation + - [ ] kill `PassivelyAwareDispatchQueue` + - [ ] improve misfire prevention and robustness - [ ] DatabaseTickWaits.{sentMessageIDs,sentThreadIDs} shouldn't exist, we get ServerEvents for new messages, use that. [wip](https://github.com/beeper/platform-imessage/tree/purav/fix-imessage-send-upsert) @@ -31,20 +32,19 @@ - [ ] autocomplete - [ ] tests -### Parity - -- [ ] add delete message for me command -- [ ] add add/remove group participant command -- [ ] fix sending emoji reactions (🎉) -- [ ] add rename group title command -- [ ] add update group image command -- [ ] add schedule message command -- [ ] add draft message command -- [ ] add leave group command -- [ ] support rich text sending -- [ ] support sending multipart messages (image(s) with caption) -- [ ] fix parsing for multi-part messages w inline stickers -- [ ] [fix real time sync of message deletions (for self, undo send already works)](https://github.com/beeper/platform-imessage/pull/63) +- parity + - [ ] add delete message for me command + - [ ] add add/remove group participant command + - [ ] fix sending emoji reactions (🎉) + - [ ] add rename group title command + - [ ] add update group image command + - [ ] add schedule message command + - [ ] add draft message command + - [ ] add leave group command + - [ ] support rich text sending + - [ ] support sending multipart messages (image(s) with caption) + - [ ] fix parsing for multi-part messages w inline stickers + - [ ] [fix real time sync of message deletions (for self, undo send already works)](https://github.com/beeper/platform-imessage/pull/63) ### Done @@ -75,3 +75,5 @@ - [x] syntax highlight - [x] add download attachment command - [x] map all message edits +- [x] add example JS script that consumes the library +- [ ] ~~run with node instead of electron ~~ From 493aae7f2ff7d8edefc79bae02cdf3cc9c3cff4b Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 03:02:44 +0530 Subject: [PATCH 10/13] fix(imdatabase): retry missing database file watchers --- .../IMDatabase/Database/IMDatabase.swift | 86 ++++++++++++------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index d474be3f..eeb96a3e 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -34,7 +34,7 @@ public final class IMDatabase { // file watchers for `chat.db` and `chat.db-wal`; these need to be // dynamically populated because the WAL can be deleted and (re)created at // any time - private var fileWatchers = [FileWatcher]() + private var fileWatchers = [String: FileWatcher]() private let listenerLock = Protected(()) private var directoryWatcher: FSEventsWatcher? @@ -111,6 +111,12 @@ public extension IMDatabase { } } + func stopListeningForChanges() { + listenerLock.withLock { _ in + stopListeningForChangesLocked() + } + } + private func stopListeningForChangesLocked() { if let directoryWatcher { directoryWatcher.stop() @@ -121,7 +127,7 @@ public extension IMDatabase { debouncer?.cancel() debouncer = nil - for watcher in fileWatchers { + for watcher in fileWatchers.values { watcher.stopListeningIfNecessary() } fileWatchers.removeAll() @@ -174,34 +180,35 @@ public extension IMDatabase { } private func ensureDatabaseFileWatchers(broadcastingTo topic: Topic) throws { - if !fileWatchers.isEmpty { - let allWatchersHaveLinks = fileWatchers.allSatisfy { watcher in - do { - return try watcher.hasHardLinks() == true - } catch { - log.error("couldn't check if \(watcher) has hard links, assuming it does: \(error)") - return false - } - } - - guard !allWatchersHaveLinks else { - log.debug("all file watchers have hard links, leaving them alone") - return - } + let desiredWatchFiles = [ + DatabaseWatchFile(url: chatDatabaseFile(in: messagesDataDirectory), required: true), + DatabaseWatchFile(url: chatDatabaseWalFile(in: messagesDataDirectory), required: false), + ] + let desiredWatchPaths = Set(desiredWatchFiles.map(\.url.path)) + + let staleWatchPaths = fileWatchers.keys.filter { !desiredWatchPaths.contains($0) } + for path in staleWatchPaths { + log.debug("purging stale FileWatcher for \(URL(fileURLWithPath: path).lastPathComponent)") + fileWatchers.removeValue(forKey: path)?.stopListeningIfNecessary() + } - log.debug("at least one file watcher lacks hard links, purging all of em (\(fileWatchers.count))") - // TODO: watchers stop listening in deinit, so maybe this is - // unnecessary assuming we have no refcycles - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() + let unlinkedWatchPaths = fileWatchers.compactMap { path, watcher -> String? in + do { + return try watcher.hasHardLinks() == true ? nil : path + } catch { + log.error("couldn't check if \(watcher) has hard links, recreating it: \(error)") + return path } - fileWatchers.removeAll() + } + for path in unlinkedWatchPaths { + log.debug("purging unlinked FileWatcher for \(URL(fileURLWithPath: path).lastPathComponent)") + fileWatchers.removeValue(forKey: path)?.stopListeningIfNecessary() } - func watchFile(at path: URL) throws { - log.debug("setting up FileWatcher for \(path.lastPathComponent)") + func makeWatcher(for file: URL) throws -> FileWatcher { + log.debug("setting up FileWatcher for \(file.lastPathComponent)") - let watcher = FileWatcher(watching: path) { [weak self] _, event in + let watcher = FileWatcher(watching: file) { [weak self] _, event in guard let self else { return } if noisy { @@ -211,16 +218,37 @@ public extension IMDatabase { } try watcher.beginListening() - fileWatchers.append(watcher) + return watcher } - // watch `.db`/`.db-wal` files for changes - try watchFile(at: chatDatabaseFile(in: messagesDataDirectory)) - try watchFile(at: chatDatabaseWalFile(in: messagesDataDirectory)) + var newWatchers = [String: FileWatcher]() + for file in desiredWatchFiles where fileWatchers[file.url.path] == nil { + do { + newWatchers[file.url.path] = try makeWatcher(for: file.url) + } catch { + if file.required { + log.debug("failed to set up required database file watcher, cleaning up \(newWatchers.count) new watcher(s)") + for watcher in newWatchers.values { + watcher.stopListeningIfNecessary() + } + throw error + } + log.debug("could not watch optional \(file.url.lastPathComponent); will retry on directory events: \(error)") + } + } + + for (path, watcher) in newWatchers { + fileWatchers[path] = watcher + } log.debug("watcher count after ensuring: \(fileWatchers.count)") } + private struct DatabaseWatchFile { + var url: URL + var required: Bool + } + private func broadcastDebouncedChanges(from topic: Topic) async throws { var broadcaster: Task? From c76a431430d2f31c3a0659758f426dab81738cf2 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 03:02:48 +0530 Subject: [PATCH 11/13] fix(platform): retry database change listener startup --- .../Sources/IMessage/PlatformAPI.swift | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index f74cbaad..400bf7c9 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -24,11 +24,14 @@ private final class PlatformAPIDatabase: @unchecked Sendable { // Claim setup once, then run watcher startup outside the state lock. let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in let db = try ensureDatabase(&state) - guard !state.hasAttemptedChangeListeningSetup else { + + switch state.changeListeningSetup { + case .notStarted, .failed: + state.changeListeningSetup = .starting + return (db, true) + case .starting, .listening: return (db, false) } - state.hasAttemptedChangeListeningSetup = true - return (db, true) } guard shouldSetUp else { @@ -37,13 +40,32 @@ private final class PlatformAPIDatabase: @unchecked Sendable { do { try db.beginListeningForChanges() + state.withLock { state in + guard state.database === db, state.changeListeningSetup == .starting else { return } + state.changeListeningSetup = .listening + } } catch { - // DatabaseTickWaits' backstop polling preserves correctness if filesystem ticks are unavailable. - platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)") + state.withLock { state in + guard state.database === db, state.changeListeningSetup == .starting else { return } + state.changeListeningSetup = .failed + } + // DatabaseTickWaits' backstop polling preserves correctness until a later call retries listener setup. + platformLog.error("failed to begin listening for database changes, using backstop polling until retry: \(error)") } return db.changes } + func stopListeningAndReset() { + let db = state.withLock { state -> IMDatabase? in + let db = state.database + state.database = nil + state.changeListeningSetup = .notStarted + return db + } + + db?.stopListeningForChanges() + } + /// Lazily opens the process-wide database, caching it on `state`. private func ensureDatabase(_ state: inout State) throws -> IMDatabase { if let cached = state.database { @@ -54,9 +76,16 @@ private final class PlatformAPIDatabase: @unchecked Sendable { return db } + private enum ChangeListeningSetupState: Equatable { + case notStarted + case starting + case listening + case failed + } + private struct State { var database: IMDatabase? - var hasAttemptedChangeListeningSetup = false + var changeListeningSetup: ChangeListeningSetupState = .notStarted } } @@ -610,6 +639,7 @@ public final class PlatformAPI { currentUserCache.withLock { $0 = nil } SystemSettingsOnboarding.stop() await EventWatcherLifecycle.shared.cancelWatchingIfNecessary(clearEventCallback: true) + database.stopListeningAndReset() try await disposeCachedMessagesController() } From acb5d4614f0dab8021cf0afcd7701baf853bb3c3 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 03:06:58 +0530 Subject: [PATCH 12/13] refactor(platform): collapse listener setup retry state --- src/IMessage/Sources/IMessage/PlatformAPI.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index 400bf7c9..120f9488 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -26,7 +26,7 @@ private final class PlatformAPIDatabase: @unchecked Sendable { let db = try ensureDatabase(&state) switch state.changeListeningSetup { - case .notStarted, .failed: + case .notStarted: state.changeListeningSetup = .starting return (db, true) case .starting, .listening: @@ -47,7 +47,7 @@ private final class PlatformAPIDatabase: @unchecked Sendable { } catch { state.withLock { state in guard state.database === db, state.changeListeningSetup == .starting else { return } - state.changeListeningSetup = .failed + state.changeListeningSetup = .notStarted } // DatabaseTickWaits' backstop polling preserves correctness until a later call retries listener setup. platformLog.error("failed to begin listening for database changes, using backstop polling until retry: \(error)") @@ -80,7 +80,6 @@ private final class PlatformAPIDatabase: @unchecked Sendable { case notStarted case starting case listening - case failed } private struct State { From 9da67ee87cbd200f079771519014a641093a9366 Mon Sep 17 00:00:00 2001 From: Kishan Bagaria <1093313+KishanBagaria@users.noreply.github.com> Date: Sun, 24 May 2026 11:01:10 +0530 Subject: [PATCH 13/13] simplify --- .../IMDatabase/Database/IMDatabase.swift | 29 +++--- .../Sources/IMessage/DatabaseTickWaits.swift | 13 +++ .../Sources/IMessage/PlatformAPI.swift | 33 +------ src/IMessage/Sources/IMessageCore/Topic.swift | 4 +- .../IMessageTests/DatabaseTickWaitTests.swift | 88 +++++++++++++------ 5 files changed, 97 insertions(+), 70 deletions(-) diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index eeb96a3e..317f2f52 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -96,18 +96,8 @@ private extension IMDatabase { public extension IMDatabase { func beginListeningForChanges() throws { try listenerLock.withLock { _ in - log.info("setting up filesystem watchers") - - stopListeningForChangesLocked() - - let unthrottledChanges = Topic() - - do { - try setUpListeners(unthrottledChanges: unthrottledChanges) - } catch { - stopListeningForChangesLocked() - throw error - } + guard directoryWatcher == nil else { return } + try startListeningForChangesLocked() } } @@ -133,6 +123,21 @@ public extension IMDatabase { fileWatchers.removeAll() } + private func startListeningForChangesLocked() throws { + log.info("setting up filesystem watchers") + + stopListeningForChangesLocked() + + let unthrottledChanges = Topic() + + do { + try setUpListeners(unthrottledChanges: unthrottledChanges) + } catch { + stopListeningForChangesLocked() + throw error + } + } + private func setUpListeners(unthrottledChanges: Topic) throws { // listen to ~/Library/Messages itself in order to respond to the WAL // file being (re)created/deleted diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift index 1072b6e4..84a1361f 100644 --- a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -8,6 +8,7 @@ private let sentMessageLinkWaitTimeout: TimeInterval = 1.5 // Re-query at least this often even without a tick: FSEvents notifications can be // dropped or coalesced, so a missed tick costs ~1s instead of the full timeout. private let databaseTickBackstopInterval: TimeInterval = 1.0 +private let loadedAttachmentMinimumRequeryInterval: TimeInterval = 0.25 enum DatabaseTickWaits { typealias SentMessageID = (rowID: Int, guid: String) @@ -87,6 +88,7 @@ enum DatabaseTickWaits { timeout: TimeInterval, changes: Topic, backstopInterval: TimeInterval = databaseTickBackstopInterval, + minimumRequeryInterval: TimeInterval = loadedAttachmentMinimumRequeryInterval, loadMessage: @escaping @Sendable () async throws -> PlatformSDK.Message?, terminalAttachmentFailureState: @escaping @Sendable () async throws -> Attachment.IMFileTransferState? ) async throws -> PlatformSDK.Message { @@ -96,6 +98,7 @@ enum DatabaseTickWaits { return try await waitForDatabaseResult( changes: changes, backstopInterval: backstopInterval, + minimumRequeryInterval: minimumRequeryInterval, query: { try await loadMessage() .orThrow(ErrorMessage("Could not find message \(messageID)")) @@ -128,6 +131,7 @@ enum DatabaseTickWaits { private static func waitForDatabaseResult( changes: Topic, backstopInterval: TimeInterval, + minimumRequeryInterval: TimeInterval = 0, query: @escaping @Sendable () async throws -> T, evaluate: (T) async throws -> WaitResult ) async throws -> T { @@ -138,7 +142,9 @@ enum DatabaseTickWaits { case let .finished(value): return value case let .waitingUntil(deadline): + let earliestNextQuery = Date().addingTimeInterval(minimumRequeryInterval) try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) + try await waitUntil(earliestNextQuery, cappedAt: deadline) } } } @@ -162,4 +168,11 @@ enum DatabaseTickWaits { _ = try await group.next() } } + + private static func waitUntil(_ date: Date, cappedAt deadline: Date) async throws { + let sleepUntil = min(date, deadline) + let remainingTime = sleepUntil.timeIntervalSinceNow + guard remainingTime > 0 else { return } + try await Task.sleep(forTimeInterval: remainingTime) + } } diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index 120f9488..3b43687c 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -21,34 +21,13 @@ private final class PlatformAPIDatabase: @unchecked Sendable { } func changeTopic() throws -> Topic { - // Claim setup once, then run watcher startup outside the state lock. - let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in - let db = try ensureDatabase(&state) - - switch state.changeListeningSetup { - case .notStarted: - state.changeListeningSetup = .starting - return (db, true) - case .starting, .listening: - return (db, false) - } - } - - guard shouldSetUp else { - return db.changes + let db = try state.withLock { state in + try ensureDatabase(&state) } do { try db.beginListeningForChanges() - state.withLock { state in - guard state.database === db, state.changeListeningSetup == .starting else { return } - state.changeListeningSetup = .listening - } } catch { - state.withLock { state in - guard state.database === db, state.changeListeningSetup == .starting else { return } - state.changeListeningSetup = .notStarted - } // DatabaseTickWaits' backstop polling preserves correctness until a later call retries listener setup. platformLog.error("failed to begin listening for database changes, using backstop polling until retry: \(error)") } @@ -59,7 +38,6 @@ private final class PlatformAPIDatabase: @unchecked Sendable { let db = state.withLock { state -> IMDatabase? in let db = state.database state.database = nil - state.changeListeningSetup = .notStarted return db } @@ -76,15 +54,8 @@ private final class PlatformAPIDatabase: @unchecked Sendable { return db } - private enum ChangeListeningSetupState: Equatable { - case notStarted - case starting - case listening - } - private struct State { var database: IMDatabase? - var changeListeningSetup: ChangeListeningSetupState = .notStarted } } diff --git a/src/IMessage/Sources/IMessageCore/Topic.swift b/src/IMessage/Sources/IMessageCore/Topic.swift index 6e2cc131..98d00802 100644 --- a/src/IMessage/Sources/IMessageCore/Topic.swift +++ b/src/IMessage/Sources/IMessageCore/Topic.swift @@ -56,7 +56,9 @@ public extension Topic { } extension Topic { - var subscriptionCount: Int { + // Internal test hook for leak/deadlock coverage; production callers should + // treat Topic as an opaque broadcaster. + var testingSubscriptionCount: Int { subscriptions.withLock { $0.count } } } diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift index 4f057007..c7174c42 100644 --- a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -11,10 +11,10 @@ import Testing for await _ in topic.subscribe() {} } - #expect(await eventually { topic.subscriptionCount == 1 }) + #expect(await eventually { topic.testingSubscriptionCount == 1 }) task.cancel() await task.value - #expect(await eventually { topic.subscriptionCount == 0 }) + #expect(await eventually { topic.testingSubscriptionCount == 0 }) } @Test func sentMessageIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { @@ -45,7 +45,7 @@ import Testing let result = try await task.value #expect(result.map(\.rowID) == [11]) #expect(queryCount.read() == 2) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } // Proves the backstop re-queries WITHOUT any broadcast(()): the result flips @@ -53,17 +53,9 @@ import Testing // well before the much-longer timeout. @Test func sentMessageIDWaitReQueriesOnBackstopWithoutTick() async throws { let changes = Topic() - let sentRows = Protected<[DatabaseTickWaits.SentMessageID]>([]) let queryCount = Protected(0) let startedAt = Date() - Task { - try? await Task.sleep(forTimeInterval: 0.15) - sentRows.withLock { - $0 = [(rowID: 11, guid: "message-11")] - } - } - let result = try await DatabaseTickWaits.sentMessageIDs( text: nil, timeout: 5, @@ -71,14 +63,16 @@ import Testing backstopInterval: 0.1 ) { queryCount.withLock { $0 += 1 } - return sentRows.read() + return Date().timeIntervalSince(startedAt) >= 0.15 + ? [(rowID: 11, guid: "message-11")] + : [] } #expect(result.map(\.rowID) == [11]) #expect(queryCount.read() >= 2) // Never broadcast: the backstop alone must have driven the re-query. #expect(Date().timeIntervalSince(startedAt) < 4.0) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } // Proves caller cancellation propagates: a never-satisfied wait with a long @@ -96,7 +90,7 @@ import Testing } } - #expect(await eventually { changes.subscriptionCount == 1 }) + #expect(await eventually { changes.testingSubscriptionCount == 1 }) task.cancel() let startedAt = Date() @@ -104,7 +98,7 @@ import Testing _ = try await task.value } #expect(Date().timeIntervalSince(startedAt) < 1.0) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func sentMessageIDWaitReturnsPartialLinkSendAfterLinkTimeout() async throws { @@ -125,7 +119,7 @@ import Testing #expect(result.map(\.rowID) == [11]) #expect(queryCount.read() == 2) #expect(Date().timeIntervalSince(startedAt) < 4.0) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func sentThreadIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { @@ -155,7 +149,7 @@ import Testing let result = try await task.value #expect(result == ["thread-1"]) #expect(queryCount.read() == 2) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func loadedAttachmentWaitReadsAgainOnlyAfterDatabaseTick() async throws { @@ -190,7 +184,49 @@ import Testing let result = try await task.value #expect(result.attachments?.first?.loading == false) #expect(queryCount.read() == 2) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitCoalescesNoisyTicksUntilMinimumRequeryInterval() async throws { + let changes = Topic() + let currentMessage = Protected(messageWithAttachmentLoading(true)) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + backstopInterval: 1, + minimumRequeryInterval: 0.5, + loadMessage: { + queryCount.withLock { $0 += 1 } + return currentMessage.read() + }, + terminalAttachmentFailureState: { + nil + } + ) + } + + #expect(await eventually { queryCount.read() == 1 && changes.testingSubscriptionCount == 1 }) + + for _ in 0 ..< 5 { + changes.broadcast(()) + try await Task.sleep(forTimeInterval: 0.01) + } + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + currentMessage.withLock { + $0 = messageWithAttachmentLoading(false) + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.attachments?.first?.loading == false) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func sentMessageIDWaitThrowsOnTimeoutWithoutTick() async throws { @@ -204,7 +240,7 @@ import Testing [] } } - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func sentThreadIDWaitReturnsPartialAfterDeadline() async throws { @@ -217,7 +253,7 @@ import Testing } #expect(result == [nil, "thread-2"]) - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func loadedAttachmentWaitThrowsWhenMessageHasNoAttachments() async throws { @@ -231,7 +267,7 @@ import Testing terminalAttachmentFailureState: { nil } ) } - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func loadedAttachmentWaitThrowsWhenMessageNotFound() async throws { @@ -245,7 +281,7 @@ import Testing terminalAttachmentFailureState: { nil } ) } - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func loadedAttachmentWaitThrowsOnTerminalFailureState() async throws { @@ -259,7 +295,7 @@ import Testing terminalAttachmentFailureState: { Attachment.IMFileTransferState.error } ) } - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } @Test func loadedAttachmentWaitThrowsOnTimeoutWithoutTick() async throws { @@ -273,7 +309,7 @@ import Testing terminalAttachmentFailureState: { nil } ) } - #expect(await eventually { changes.subscriptionCount == 0 }) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) } // Guards the lock-release-before-finish() fix: calling finish() under the lock @@ -281,11 +317,11 @@ import Testing @Test func finishCurrentSubscribersDoesNotDeadlockAndClearsSubscriptions() { let topic = Topic() let streams = (0 ..< 5).map { _ in topic.subscribe() } - #expect(topic.subscriptionCount == 5) + #expect(topic.testingSubscriptionCount == 5) topic.finishCurrentSubscribers() - #expect(topic.subscriptionCount == 0) + #expect(topic.testingSubscriptionCount == 0) withExtendedLifetime(streams) {} }