diff --git a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift index 94ee854e..317f2f52 100644 --- a/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift +++ b/src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift @@ -34,8 +34,10 @@ public final class IMDatabase { // file watchers for `chat.db` and `chat.db-wal`; these need to be // dynamically populated because the WAL can be deleted and (re)created at // any time - private var fileWatchers = [FileWatcher]() + private var fileWatchers = [String: FileWatcher]() + private let listenerLock = Protected(()) + private var directoryWatcher: FSEventsWatcher? private var debouncer: Task? public var noisy = false @@ -72,10 +74,9 @@ public final class IMDatabase { deinit { log.debug("being deallocated, stopping watchers and listeners if necessary") - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() + listenerLock.withLock { _ in + stopListeningForChangesLocked() } - debouncer?.cancel() } } @@ -94,10 +95,50 @@ private extension IMDatabase { public extension IMDatabase { func beginListeningForChanges() throws { + try listenerLock.withLock { _ in + guard directoryWatcher == nil else { return } + try startListeningForChangesLocked() + } + } + + func stopListeningForChanges() { + listenerLock.withLock { _ in + stopListeningForChangesLocked() + } + } + + private func stopListeningForChangesLocked() { + if let directoryWatcher { + directoryWatcher.stop() + directoryWatcher.invalidate() + self.directoryWatcher = nil + } + + debouncer?.cancel() + debouncer = nil + + for watcher in fileWatchers.values { + watcher.stopListeningIfNecessary() + } + fileWatchers.removeAll() + } + + private func startListeningForChangesLocked() throws { log.info("setting up filesystem watchers") + stopListeningForChangesLocked() + let unthrottledChanges = Topic() + do { + try setUpListeners(unthrottledChanges: unthrottledChanges) + } catch { + stopListeningForChangesLocked() + throw error + } + } + + private func setUpListeners(unthrottledChanges: Topic) throws { // listen to ~/Library/Messages itself in order to respond to the WAL // file being (re)created/deleted let directoryWatcher = try FSEventsWatcher(watchingPath: messagesDataDirectory.path, latency: 1.0) { [weak self] _, event in @@ -119,13 +160,16 @@ public extension IMDatabase { log.debug("received FSEvent [\(event.id)] \(anonymizedPath) \(event.flags)") do { - try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) + try listenerLock.withLock { _ in + try self.ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) + } } catch { log.error("failed to ensure database file watchers in response to WAL file event: \(error)") } } directoryWatcher.setDispatchQueue(fsEventsQueue) try directoryWatcher.start() + self.directoryWatcher = directoryWatcher try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges) @@ -141,34 +185,35 @@ public extension IMDatabase { } private func ensureDatabaseFileWatchers(broadcastingTo topic: Topic) throws { - if !fileWatchers.isEmpty { - let allWatchersHaveLinks = fileWatchers.allSatisfy { watcher in - do { - return try watcher.hasHardLinks() == true - } catch { - log.error("couldn't check if \(watcher) has hard links, assuming it does: \(error)") - return false - } - } - - guard !allWatchersHaveLinks else { - log.debug("all file watchers have hard links, leaving them alone") - return - } + let desiredWatchFiles = [ + DatabaseWatchFile(url: chatDatabaseFile(in: messagesDataDirectory), required: true), + DatabaseWatchFile(url: chatDatabaseWalFile(in: messagesDataDirectory), required: false), + ] + let desiredWatchPaths = Set(desiredWatchFiles.map(\.url.path)) + + let staleWatchPaths = fileWatchers.keys.filter { !desiredWatchPaths.contains($0) } + for path in staleWatchPaths { + log.debug("purging stale FileWatcher for \(URL(fileURLWithPath: path).lastPathComponent)") + fileWatchers.removeValue(forKey: path)?.stopListeningIfNecessary() + } - log.debug("at least one file watcher lacks hard links, purging all of em (\(fileWatchers.count))") - // TODO: watchers stop listening in deinit, so maybe this is - // unnecessary assuming we have no refcycles - for watcher in fileWatchers { - watcher.stopListeningIfNecessary() + let unlinkedWatchPaths = fileWatchers.compactMap { path, watcher -> String? in + do { + return try watcher.hasHardLinks() == true ? nil : path + } catch { + log.error("couldn't check if \(watcher) has hard links, recreating it: \(error)") + return path } - fileWatchers.removeAll() + } + for path in unlinkedWatchPaths { + log.debug("purging unlinked FileWatcher for \(URL(fileURLWithPath: path).lastPathComponent)") + fileWatchers.removeValue(forKey: path)?.stopListeningIfNecessary() } - func watchFile(at path: URL) throws { - log.debug("setting up FileWatcher for \(path.lastPathComponent)") + func makeWatcher(for file: URL) throws -> FileWatcher { + log.debug("setting up FileWatcher for \(file.lastPathComponent)") - let watcher = FileWatcher(watching: path) { [weak self] _, event in + let watcher = FileWatcher(watching: file) { [weak self] _, event in guard let self else { return } if noisy { @@ -178,16 +223,37 @@ public extension IMDatabase { } try watcher.beginListening() - fileWatchers.append(watcher) + return watcher } - // watch `.db`/`.db-wal` files for changes - try watchFile(at: chatDatabaseFile(in: messagesDataDirectory)) - try watchFile(at: chatDatabaseWalFile(in: messagesDataDirectory)) + var newWatchers = [String: FileWatcher]() + for file in desiredWatchFiles where fileWatchers[file.url.path] == nil { + do { + newWatchers[file.url.path] = try makeWatcher(for: file.url) + } catch { + if file.required { + log.debug("failed to set up required database file watcher, cleaning up \(newWatchers.count) new watcher(s)") + for watcher in newWatchers.values { + watcher.stopListeningIfNecessary() + } + throw error + } + log.debug("could not watch optional \(file.url.lastPathComponent); will retry on directory events: \(error)") + } + } + + for (path, watcher) in newWatchers { + fileWatchers[path] = watcher + } log.debug("watcher count after ensuring: \(fileWatchers.count)") } + private struct DatabaseWatchFile { + var url: URL + var required: Bool + } + private func broadcastDebouncedChanges(from topic: Topic) async throws { var broadcaster: Task? diff --git a/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift new file mode 100644 index 00000000..84a1361f --- /dev/null +++ b/src/IMessage/Sources/IMessage/DatabaseTickWaits.swift @@ -0,0 +1,178 @@ +import Foundation +import IMDatabase +import IMessageCore +import PlatformSDK + +private let sentMessageLinkWaitTimeout: TimeInterval = 1.5 + +// Re-query at least this often even without a tick: FSEvents notifications can be +// dropped or coalesced, so a missed tick costs ~1s instead of the full timeout. +private let databaseTickBackstopInterval: TimeInterval = 1.0 +private let loadedAttachmentMinimumRequeryInterval: TimeInterval = 0.25 + +enum DatabaseTickWaits { + typealias SentMessageID = (rowID: Int, guid: String) + + private enum WaitResult { + case finished(T) + case waitingUntil(Date) + } + + static func sentMessageIDs( + text: String?, + timeout: TimeInterval, + changes: Topic, + linkTimeout: TimeInterval = sentMessageLinkWaitTimeout, + backstopInterval: TimeInterval = databaseTickBackstopInterval, + querySentMessageIDs: @escaping @Sendable () throws -> [SentMessageID] + ) async throws -> [SentMessageID] { + let startedAt = Date() + let timeoutDeadline = startedAt.addingTimeInterval(timeout) + let linkDeadline = startedAt.addingTimeInterval(linkTimeout) + let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1 + + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + query: { + try querySentMessageIDs() + }, + evaluate: { sentMessageIDs in + let now = Date() + if sentMessageIDs.count == expectedNewMessageIDCount { + return .finished(sentMessageIDs) + } + if text != nil, !sentMessageIDs.isEmpty, now >= linkDeadline { + return .finished(sentMessageIDs) + } + if now >= timeoutDeadline { + throw ErrorMessage("timed out waiting for sent messages") + } + + let wakeDeadline: Date + if text != nil, !sentMessageIDs.isEmpty { + wakeDeadline = min(timeoutDeadline, linkDeadline) + } else { + wakeDeadline = timeoutDeadline + } + return .waitingUntil(wakeDeadline) + } + ) + } + + static func sentThreadIDs( + timeout: TimeInterval, + changes: Topic, + backstopInterval: TimeInterval = databaseTickBackstopInterval, + querySentThreadIDs: @escaping @Sendable () throws -> [String?] + ) async throws -> [String?] { + let deadline = Date().addingTimeInterval(timeout) + + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + query: { + try querySentThreadIDs() + }, + evaluate: { threadIDs in + if !threadIDs.contains(nil) || Date() >= deadline { + return .finished(threadIDs) + } + return .waitingUntil(deadline) + } + ) + } + + static func loadedAttachment( + messageID: String, + timeout: TimeInterval, + changes: Topic, + backstopInterval: TimeInterval = databaseTickBackstopInterval, + minimumRequeryInterval: TimeInterval = loadedAttachmentMinimumRequeryInterval, + loadMessage: @escaping @Sendable () async throws -> PlatformSDK.Message?, + terminalAttachmentFailureState: @escaping @Sendable () async throws -> Attachment.IMFileTransferState? + ) async throws -> PlatformSDK.Message { + let deadline = Date().addingTimeInterval(timeout) + var isFirstRead = true + + return try await waitForDatabaseResult( + changes: changes, + backstopInterval: backstopInterval, + minimumRequeryInterval: minimumRequeryInterval, + query: { + try await loadMessage() + .orThrow(ErrorMessage("Could not find message \(messageID)")) + }, + evaluate: { message in + let attachments = message.attachments ?? [] + if isFirstRead { + guard !attachments.isEmpty else { + throw ErrorMessage("Message \(messageID) has no attachments") + } + isFirstRead = false + } + if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { + return .finished(message) + } + + if let failureState = try await terminalAttachmentFailureState() { + throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") + } + + guard Date() < deadline else { + throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + } + + return .waitingUntil(deadline) + } + ) + } + + private static func waitForDatabaseResult( + changes: Topic, + backstopInterval: TimeInterval, + minimumRequeryInterval: TimeInterval = 0, + query: @escaping @Sendable () async throws -> T, + evaluate: (T) async throws -> WaitResult + ) async throws -> T { + while true { + let changeStream = changes.subscribe() + let result = try await query() + switch try await evaluate(result) { + case let .finished(value): + return value + case let .waitingUntil(deadline): + let earliestNextQuery = Date().addingTimeInterval(minimumRequeryInterval) + try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) + try await waitUntil(earliestNextQuery, cappedAt: deadline) + } + } + } + + private static func waitForChange(on stream: AsyncStream, until deadline: Date, backstopInterval: TimeInterval) async throws { + let remainingTime = deadline.timeIntervalSinceNow + guard remainingTime > 0 else { return } + + let sleepTime = min(remainingTime, backstopInterval) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var iterator = stream.makeAsyncIterator() + _ = await iterator.next() + } + group.addTask { + try await Task.sleep(forTimeInterval: sleepTime) + } + + defer { group.cancelAll() } + _ = try await group.next() + } + } + + private static func waitUntil(_ date: Date, cappedAt deadline: Date) async throws { + let sleepUntil = min(date, deadline) + let remainingTime = sleepUntil.timeIntervalSinceNow + guard remainingTime > 0 else { return } + try await Task.sleep(forTimeInterval: remainingTime) + } +} diff --git a/src/IMessage/Sources/IMessage/PlatformAPI.swift b/src/IMessage/Sources/IMessage/PlatformAPI.swift index e69b615b..3b43687c 100644 --- a/src/IMessage/Sources/IMessage/PlatformAPI.swift +++ b/src/IMessage/Sources/IMessage/PlatformAPI.swift @@ -8,26 +8,54 @@ private let messagePageLimit = 20 private let platformLog = Logger(imessageLabel: "platform-api") private let messageSendTimeout: TimeInterval = 45 private let reactionSendTimeout: TimeInterval = 5 -private let waitForLinksTimeout: TimeInterval = 1.5 private let waitForSentThreadTimeout: TimeInterval = 10 -private let sentMessagePollInterval: TimeInterval = 0.025 private let loadAttachmentTimeout: TimeInterval = 60 -private let loadAttachmentInitialPollInterval: TimeInterval = 0.25 -private let loadAttachmentMaxPollInterval: TimeInterval = 5 private final class PlatformAPIDatabase: @unchecked Sendable { - private let database = Protected() + private let state = Protected(State()) func withDatabase(_ action: (IMDatabase) throws -> T) throws -> T { - try database.withLock { cachedDatabase in - if let cachedDatabase { - return try action(cachedDatabase) - } + try state.withLock { state in + try action(try ensureDatabase(&state)) + } + } + + func changeTopic() throws -> Topic { + let db = try state.withLock { state in + try ensureDatabase(&state) + } - let newDatabase = try IMDatabase(createIndexes: true) - cachedDatabase = newDatabase - return try action(newDatabase) + do { + try db.beginListeningForChanges() + } catch { + // DatabaseTickWaits' backstop polling preserves correctness until a later call retries listener setup. + platformLog.error("failed to begin listening for database changes, using backstop polling until retry: \(error)") } + return db.changes + } + + func stopListeningAndReset() { + let db = state.withLock { state -> IMDatabase? in + let db = state.database + state.database = nil + return db + } + + db?.stopListeningForChanges() + } + + /// Lazily opens the process-wide database, caching it on `state`. + private func ensureDatabase(_ state: inout State) throws -> IMDatabase { + if let cached = state.database { + return cached + } + let db = try IMDatabase(createIndexes: true) + state.database = db + return db + } + + private struct State { + var database: IMDatabase? } } @@ -410,36 +438,18 @@ public final class PlatformAPI { messageID: String, timeout: TimeInterval ) async throws -> PlatformSDK.Message { - let deadline = Date().addingTimeInterval(timeout) - var pollInterval = loadAttachmentInitialPollInterval - var isFirstRead = true - - while true { - let message = try await getMessage(threadID: threadID, messageID: messageID) - .orThrow(ErrorMessage("Could not find message \(messageID)")) - let attachments = message.attachments ?? [] - if isFirstRead { - guard !attachments.isEmpty else { - throw ErrorMessage("Message \(messageID) has no attachments") - } - isFirstRead = false - } - if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) { - return message - } - - if let failureState = try await terminalAttachmentFailureState(messageID: messageID) { - throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))") - } - - let remainingTime = deadline.timeIntervalSinceNow - guard remainingTime > 0 else { - throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load") + let changes = try database.changeTopic() + return try await DatabaseTickWaits.loadedAttachment( + messageID: messageID, + timeout: timeout, + changes: changes, + loadMessage: { + try await self.getMessage(threadID: threadID, messageID: messageID) + }, + terminalAttachmentFailureState: { + try await self.terminalAttachmentFailureState(messageID: messageID) } - - try await Task.sleep(forTimeInterval: min(pollInterval, remainingTime)) - pollInterval = min(pollInterval * 2, loadAttachmentMaxPollInterval) - } + ) } /// Returns the first attachment state on `messageID` that is terminal failure @@ -599,6 +609,7 @@ public final class PlatformAPI { currentUserCache.withLock { $0 = nil } SystemSettingsOnboarding.stop() await EventWatcherLifecycle.shared.cancelWatchingIfNecessary(clearEventCallback: true) + database.stopListeningAndReset() try await disposeCachedMessagesController() } @@ -764,47 +775,34 @@ public final class PlatformAPI { timeout: TimeInterval ) async throws -> [(rowID: Int, guid: String)] { let database = database - return try await Task.detached(priority: .userInitiated) { - let start = Date() - let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1 - var sentMessageIDs: [(rowID: Int, guid: String)] = [] - while sentMessageIDs.count != expectedNewMessageIDCount { - sentMessageIDs = try database.withDatabase { db in - try db.sentMessageIDs(since: lastRowID) - } - if text != nil, !sentMessageIDs.isEmpty, Date().timeIntervalSince(start) > waitForLinksTimeout { - break - } - if Date().timeIntervalSince(start) > timeout { - throw ErrorMessage("timed out waiting for sent messages") - } - try await Task.sleep(forTimeInterval: sentMessagePollInterval) + let changes = try database.changeTopic() + // Not via Task.detached, so caller cancellation propagates and the wait + // stops promptly instead of running to its own timeout. + return try await DatabaseTickWaits.sentMessageIDs( + text: text, + timeout: timeout, + changes: changes + ) { + try database.withDatabase { db in + try db.sentMessageIDs(since: lastRowID) } - return sentMessageIDs - }.value + } } private func waitForSentThreadIDs(messageRowIDs: [Int]) async throws -> [String?] { let database = database - return try await Task.detached(priority: .userInitiated) { - let sentThreadIDs = { - try messageRowIDs.map { rowID in - try database.withDatabase { db in - try db.threadIDForMessage(rowID: rowID) - } - } - } - var threadIDs = try sentThreadIDs() - let start = Date() - while threadIDs.contains(where: { $0 == nil }) { - try await Task.sleep(forTimeInterval: sentMessagePollInterval) - threadIDs = try sentThreadIDs() - if Date().timeIntervalSince(start) > waitForSentThreadTimeout { - break + let changes = try database.changeTopic() + // Not via Task.detached, so caller cancellation propagates. + return try await DatabaseTickWaits.sentThreadIDs( + timeout: waitForSentThreadTimeout, + changes: changes + ) { + try messageRowIDs.map { rowID in + try database.withDatabase { db in + try db.threadIDForMessage(rowID: rowID) } } - return threadIDs - }.value + } } private func sentMessages(_ sentMessageIDs: [(rowID: Int, guid: String)]) async throws -> [PlatformSDK.Message] { diff --git a/src/IMessage/Sources/IMessageCore/Topic.swift b/src/IMessage/Sources/IMessageCore/Topic.swift index 33959af2..98d00802 100644 --- a/src/IMessage/Sources/IMessageCore/Topic.swift +++ b/src/IMessage/Sources/IMessageCore/Topic.swift @@ -1,8 +1,10 @@ +import Foundation + public final class Topic { public typealias BufferingPolicy = AsyncStream.Continuation.BufferingPolicy private let bufferingPolicy: BufferingPolicy - private var subscriptions = Protected<[AsyncStream.Continuation]>([]) + private var subscriptions = Protected<[UUID: AsyncStream.Continuation]>([:]) public init(bufferingPolicy: BufferingPolicy = .unbounded) { self.bufferingPolicy = bufferingPolicy @@ -13,17 +15,24 @@ extension Topic: @unchecked Sendable {} public extension Topic { func broadcast(_ value: sending T) { - subscriptions.withLock { - for subscription in $0 { - subscription.yield(value) - } + let currentSubscriptions = subscriptions.withLock { + Array($0.values) + } + for subscription in currentSubscriptions { + subscription.yield(value) } } func subscribe() -> AsyncStream { + let id = UUID() let (stream, cont) = AsyncStream.makeStream(of: T.self, bufferingPolicy: bufferingPolicy) + cont.onTermination = { [weak self] _ in + self?.subscriptions.withLock { + $0[id] = nil + } + } subscriptions.withLock { - $0.append(cont) + $0[id] = cont } return stream @@ -35,11 +44,21 @@ public extension Topic { * New subscribers may still be registered after calling this method. */ func finishCurrentSubscribers() { - subscriptions.withLock { - for subscription in $0 { - subscription.finish() - } + let currentSubscriptions = subscriptions.withLock { + let current = Array($0.values) $0.removeAll() + return current + } + for subscription in currentSubscriptions { + subscription.finish() } } } + +extension Topic { + // Internal test hook for leak/deadlock coverage; production callers should + // treat Topic as an opaque broadcaster. + var testingSubscriptionCount: Int { + subscriptions.withLock { $0.count } + } +} diff --git a/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift new file mode 100644 index 00000000..c7174c42 --- /dev/null +++ b/src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift @@ -0,0 +1,361 @@ +import Foundation +import IMDatabase +@testable import IMessage +@testable import IMessageCore +import PlatformSDK +import Testing + +@Test func topicRemovesTerminatedSubscriptions() async throws { + let topic = Topic() + let task = Task { + for await _ in topic.subscribe() {} + } + + #expect(await eventually { topic.testingSubscriptionCount == 1 }) + task.cancel() + await task.value + #expect(await eventually { topic.testingSubscriptionCount == 0 }) +} + +@Test func sentMessageIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let sentRows = Protected<[DatabaseTickWaits.SentMessageID]>([]) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 1, + changes: changes + ) { + queryCount.withLock { $0 += 1 } + return sentRows.read() + } + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + sentRows.withLock { + $0 = [(rowID: 11, guid: "message-11")] + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +// Proves the backstop re-queries WITHOUT any broadcast(()): the result flips +// after the (short, injected) backstop interval and is picked up by the poll, +// well before the much-longer timeout. +@Test func sentMessageIDWaitReQueriesOnBackstopWithoutTick() async throws { + let changes = Topic() + let queryCount = Protected(0) + let startedAt = Date() + + let result = try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 5, + changes: changes, + backstopInterval: 0.1 + ) { + queryCount.withLock { $0 += 1 } + return Date().timeIntervalSince(startedAt) >= 0.15 + ? [(rowID: 11, guid: "message-11")] + : [] + } + + #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() >= 2) + // Never broadcast: the backstop alone must have driven the re-query. + #expect(Date().timeIntervalSince(startedAt) < 4.0) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +// Proves caller cancellation propagates: a never-satisfied wait with a long +// timeout finishes promptly when its task is cancelled, and unsubscribes. +@Test func sentMessageIDWaitPropagatesCallerCancellation() async throws { + let changes = Topic() + + let task = Task { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 5, + changes: changes + ) { + [] + } + } + + #expect(await eventually { changes.testingSubscriptionCount == 1 }) + task.cancel() + + let startedAt = Date() + await #expect(throws: (any Error).self) { + _ = try await task.value + } + #expect(Date().timeIntervalSince(startedAt) < 1.0) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func sentMessageIDWaitReturnsPartialLinkSendAfterLinkTimeout() async throws { + let changes = Topic() + let queryCount = Protected(0) + let startedAt = Date() + + let result = try await DatabaseTickWaits.sentMessageIDs( + text: "https://one.example https://two.example", + timeout: 5, + changes: changes, + linkTimeout: 0.05 + ) { + queryCount.withLock { $0 += 1 } + return [(rowID: 11, guid: "message-11")] + } + + #expect(result.map(\.rowID) == [11]) + #expect(queryCount.read() == 2) + #expect(Date().timeIntervalSince(startedAt) < 4.0) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func sentThreadIDWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let threadIDs = Protected<[String?]>([nil]) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.sentThreadIDs( + timeout: 1, + changes: changes + ) { + queryCount.withLock { $0 += 1 } + return threadIDs.read() + } + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + threadIDs.withLock { + $0 = ["thread-1"] + } + changes.broadcast(()) + + let result = try await task.value + #expect(result == ["thread-1"]) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitReadsAgainOnlyAfterDatabaseTick() async throws { + let changes = Topic() + let currentMessage = Protected(messageWithAttachmentLoading(true)) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { + queryCount.withLock { $0 += 1 } + return currentMessage.read() + }, + terminalAttachmentFailureState: { + nil + } + ) + } + + #expect(await eventually { queryCount.read() == 1 }) + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + currentMessage.withLock { + $0 = messageWithAttachmentLoading(false) + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.attachments?.first?.loading == false) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitCoalescesNoisyTicksUntilMinimumRequeryInterval() async throws { + let changes = Topic() + let currentMessage = Protected(messageWithAttachmentLoading(true)) + let queryCount = Protected(0) + + let task = Task { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + backstopInterval: 1, + minimumRequeryInterval: 0.5, + loadMessage: { + queryCount.withLock { $0 += 1 } + return currentMessage.read() + }, + terminalAttachmentFailureState: { + nil + } + ) + } + + #expect(await eventually { queryCount.read() == 1 && changes.testingSubscriptionCount == 1 }) + + for _ in 0 ..< 5 { + changes.broadcast(()) + try await Task.sleep(forTimeInterval: 0.01) + } + try await Task.sleep(forTimeInterval: 0.1) + #expect(queryCount.read() == 1) + + currentMessage.withLock { + $0 = messageWithAttachmentLoading(false) + } + changes.broadcast(()) + + let result = try await task.value + #expect(result.attachments?.first?.loading == false) + #expect(queryCount.read() == 2) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func sentMessageIDWaitThrowsOnTimeoutWithoutTick() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.sentMessageIDs( + text: nil, + timeout: 0.2, + changes: changes + ) { + [] + } + } + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func sentThreadIDWaitReturnsPartialAfterDeadline() async throws { + let changes = Topic() + let result = try await DatabaseTickWaits.sentThreadIDs( + timeout: 0.2, + changes: changes + ) { + [nil, "thread-2"] + } + + #expect(result == [nil, "thread-2"]) + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsWhenMessageHasNoAttachments() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { messageWithNoAttachments() }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsWhenMessageNotFound() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { nil }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsOnTerminalFailureState() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 1, + changes: changes, + loadMessage: { messageWithAttachmentLoading(true) }, + terminalAttachmentFailureState: { Attachment.IMFileTransferState.error } + ) + } + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +@Test func loadedAttachmentWaitThrowsOnTimeoutWithoutTick() async throws { + let changes = Topic() + await #expect(throws: (any Error).self) { + try await DatabaseTickWaits.loadedAttachment( + messageID: "message-1", + timeout: 0.2, + changes: changes, + loadMessage: { messageWithAttachmentLoading(true) }, + terminalAttachmentFailureState: { nil } + ) + } + #expect(await eventually { changes.testingSubscriptionCount == 0 }) +} + +// Guards the lock-release-before-finish() fix: calling finish() under the lock +// would re-enter onTermination on the non-reentrant os_unfair_lock and deadlock. +@Test func finishCurrentSubscribersDoesNotDeadlockAndClearsSubscriptions() { + let topic = Topic() + let streams = (0 ..< 5).map { _ in topic.subscribe() } + #expect(topic.testingSubscriptionCount == 5) + + topic.finishCurrentSubscribers() + + #expect(topic.testingSubscriptionCount == 0) + withExtendedLifetime(streams) {} +} + +private func messageWithNoAttachments() -> PlatformSDK.Message { + PlatformSDK.Message( + id: "message-1", + timestamp: 1, + senderID: "sender-1", + attachments: [] + ) +} + +private func messageWithAttachmentLoading(_ loading: Bool) -> PlatformSDK.Message { + PlatformSDK.Message( + id: "message-1", + timestamp: 1, + senderID: "sender-1", + attachments: [ + PlatformSDK.Attachment( + id: "attachment-1", + type: .img, + loading: loading + ), + ] + ) +} + +private func eventually(timeout: TimeInterval = 1, _ predicate: () -> Bool) async -> Bool { + let deadline = Date().addingTimeInterval(timeout) + while Date() < deadline { + if predicate() { + return true + } + try? await Task.sleep(forTimeInterval: 0.01) + } + return predicate() +} diff --git a/todos.md b/todos.md index 40b5c405..2cd21bdc 100644 --- a/todos.md +++ b/todos.md @@ -4,7 +4,6 @@ - [ ] publish to homebrew - [ ] add example Swift script that consumes the library -- [ ] add example JS script that consumes the library - [ ] improve permissions prompt, use - [ ] replace SQLite w https://github.com/pointfreeco/sqlite-data and benchmark @@ -13,36 +12,39 @@ - [ ] consider folding callback from `PlatformAPI.onThreadSelected` into `subscribeToEvents` - [ ] [bridgev2](https://github.com/mautrix/go) version for self hosting support -- [ ] run with node instead of electron -- [ ] review for races, `PlatformAPI.messagesController` is mutated without isolation - [ ] add cross-process coordination when more than one process is driving Messages.app at the same time (e.g. Beeper Desktop + a separate CLI process, or two CLIs w secondary instance off) - [ ] separate `UserDefaults` somehow so that CLI and other consumers don't share the prefs - [ ] user manually killing the messages.app causes cli to not detect that ("Domain=NSOSStatusErrorDomain Code=-600 "procNotFound: no eligible process with specified descriptor"") -- [ ] improve misfire prevention and robustness - [ ] when scheduled messages are actually sent, send a message update event - [ ] perhaps move PlatformSDK to +- concurrency + - [ ] review for races, `PlatformAPI.messagesController` is mutated without isolation + - [ ] kill `PassivelyAwareDispatchQueue` + +- [ ] improve misfire prevention and robustness +- [ ] DatabaseTickWaits.{sentMessageIDs,sentThreadIDs} shouldn't exist, we get ServerEvents for new messages, use that. [wip](https://github.com/beeper/platform-imessage/tree/purav/fix-imessage-send-upsert) + - cli - [ ] command to watch chat that prints new activity for just that chat, json new-line separated - [ ] use better library for repl? - [ ] autocomplete - [ ] tests -### Parity - -- [ ] add delete message for me command -- [ ] add add/remove group participant command -- [ ] fix sending emoji reactions (🎉) -- [ ] add rename group title command -- [ ] add update group image command -- [ ] add schedule message command -- [ ] add draft message command -- [ ] add leave group command -- [ ] support rich text sending -- [ ] support sending multipart messages (image(s) with caption) -- [ ] fix parsing for multi-part messages w inline stickers -- [ ] [fix real time sync of message deletions (for self, undo send already works)](https://github.com/beeper/platform-imessage/pull/63) +- parity + - [ ] add delete message for me command + - [ ] add add/remove group participant command + - [ ] fix sending emoji reactions (🎉) + - [ ] add rename group title command + - [ ] add update group image command + - [ ] add schedule message command + - [ ] add draft message command + - [ ] add leave group command + - [ ] support rich text sending + - [ ] support sending multipart messages (image(s) with caption) + - [ ] fix parsing for multi-part messages w inline stickers + - [ ] [fix real time sync of message deletions (for self, undo send already works)](https://github.com/beeper/platform-imessage/pull/63) ### Done @@ -73,3 +75,5 @@ - [x] syntax highlight - [x] add download attachment command - [x] map all message edits +- [x] add example JS script that consumes the library +- [ ] ~~run with node instead of electron ~~