Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,41 @@ public extension IMDatabase {
func beginListeningForChanges() throws {
log.info("setting up filesystem watchers")

// Idempotent: a prior (possibly partial) setup may have left a debouncer
// Task and file watchers behind. Tear them down first so a retry doesn't
// leak the old debouncer Task or double-register watchers.
debouncer?.cancel()
debouncer = nil
for watcher in fileWatchers {
watcher.stopListeningIfNecessary()
}
fileWatchers.removeAll()

let unthrottledChanges = Topic<Void>()

// If setup throws partway, clean up whatever we managed to register so a
// later retry starts from a clean slate instead of half-wired watchers.
var directoryWatcher: FSEventsWatcher?
func cleanupPartialSetup() {
if let directoryWatcher {
directoryWatcher.stop()
directoryWatcher.invalidate()
}
for watcher in fileWatchers {
watcher.stopListeningIfNecessary()
}
fileWatchers.removeAll()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

do {
try setUpListeners(unthrottledChanges: unthrottledChanges, directoryWatcher: &directoryWatcher)
} catch {
cleanupPartialSetup()
throw error
}
}

private func setUpListeners(unthrottledChanges: Topic<Void>, directoryWatcher directoryWatcherOut: inout FSEventsWatcher?) throws {
// listen to ~/Library/Messages itself in order to respond to the WAL
// file being (re)created/deleted
let directoryWatcher = try FSEventsWatcher(watchingPath: messagesDataDirectory.path, latency: 1.0) { [weak self] _, event in
Expand Down Expand Up @@ -125,6 +158,7 @@ public extension IMDatabase {
}
}
directoryWatcher.setDispatchQueue(fsEventsQueue)
directoryWatcherOut = directoryWatcher
try directoryWatcher.start()

try ensureDatabaseFileWatchers(broadcastingTo: unthrottledChanges)
Expand Down
134 changes: 134 additions & 0 deletions src/IMessage/Sources/IMessage/DatabaseTickWaits.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import Foundation
import IMDatabase
import IMessageCore
import PlatformSDK

private let sentMessageLinkWaitTimeout: TimeInterval = 1.5

// Re-query at least this often even without a tick: FSEvents notifications can be
// dropped or coalesced, so a missed tick costs ~1s instead of the full timeout.
private let databaseTickBackstopInterval: TimeInterval = 1.0

enum DatabaseTickWaits {
typealias SentMessageID = (rowID: Int, guid: String)

static func sentMessageIDs(
text: String?,
timeout: TimeInterval,
changes: Topic<Void>,
linkTimeout: TimeInterval = sentMessageLinkWaitTimeout,
backstopInterval: TimeInterval = databaseTickBackstopInterval,
querySentMessageIDs: @escaping @Sendable () throws -> [SentMessageID]
) async throws -> [SentMessageID] {
let startedAt = Date()
let timeoutDeadline = startedAt.addingTimeInterval(timeout)
let linkDeadline = startedAt.addingTimeInterval(linkTimeout)
let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1

while true {
let changeStream = changes.subscribe()
let sentMessageIDs = try querySentMessageIDs()
if sentMessageIDs.count == expectedNewMessageIDCount {
return sentMessageIDs
}
if text != nil, !sentMessageIDs.isEmpty, Date() >= linkDeadline {
return sentMessageIDs
}
if Date() >= timeoutDeadline {
throw ErrorMessage("timed out waiting for sent messages")
}

let wakeDeadline: Date
if text != nil, !sentMessageIDs.isEmpty {
wakeDeadline = min(timeoutDeadline, linkDeadline)
} else {
wakeDeadline = timeoutDeadline
}
_ = try await waitForChange(on: changeStream, until: wakeDeadline, backstopInterval: backstopInterval)
}
}

static func sentThreadIDs(
timeout: TimeInterval,
changes: Topic<Void>,
backstopInterval: TimeInterval = databaseTickBackstopInterval,
querySentThreadIDs: @escaping @Sendable () throws -> [String?]
) async throws -> [String?] {
let deadline = Date().addingTimeInterval(timeout)

while true {
let changeStream = changes.subscribe()
let threadIDs = try querySentThreadIDs()
if !threadIDs.contains(where: { $0 == nil }) || Date() >= deadline {
return threadIDs
}

_ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval)
}
}

static func loadedAttachment(
messageID: String,
timeout: TimeInterval,
changes: Topic<Void>,
backstopInterval: TimeInterval = databaseTickBackstopInterval,
loadMessage: @escaping @Sendable () async throws -> PlatformSDK.Message?,
terminalAttachmentFailureState: @escaping @Sendable () async throws -> Attachment.IMFileTransferState?
) async throws -> PlatformSDK.Message {
let deadline = Date().addingTimeInterval(timeout)
var isFirstRead = true

while true {
let changeStream = changes.subscribe()
let message = try await loadMessage()
.orThrow(ErrorMessage("Could not find message \(messageID)"))
let attachments = message.attachments ?? []
if isFirstRead {
guard !attachments.isEmpty else {
throw ErrorMessage("Message \(messageID) has no attachments")
}
isFirstRead = false
}
if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) {
return message
}

if let failureState = try await terminalAttachmentFailureState() {
throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))")
}

guard Date() < deadline else {
throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load")
}

_ = try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval)
}
}
Comment on lines +131 to +150
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Subscription leak when query succeeds without waiting.

When evaluate returns .finished on the first query attempt, the subscription created at line 134 is never iterated—waitForChange is skipped. Since AsyncStream.onTermination only fires when the stream is iterated, finished, or the iterating task is cancelled, the continuation remains in Topic.subscriptions indefinitely.

Over time, these dangling subscriptions accumulate: each broadcast() will yield to orphaned continuations with .unbounded buffering, causing unbounded memory growth.

Proposed fix: Wrap stream in RAII-style cleanup

Introduce a small wrapper that ensures the stream is consumed/cancelled on scope exit:

+    private struct ScopedSubscription {
+        let stream: AsyncStream<Void>
+        private var iterator: AsyncStream<Void>.AsyncIterator?
+        
+        init(_ stream: AsyncStream<Void>) {
+            self.stream = stream
+        }
+        
+        mutating func consume() async {
+            if iterator == nil {
+                iterator = stream.makeAsyncIterator()
+            }
+            _ = await iterator?.next()
+        }
+    }
+
     private static func waitForDatabaseResult<T>(
         changes: Topic<Void>,
         backstopInterval: TimeInterval,
         query: `@escaping` `@Sendable` () async throws -> T,
         evaluate: (T) async throws -> WaitResult<T>
     ) async throws -> T {
         while true {
-            let changeStream = changes.subscribe()
+            var subscription = ScopedSubscription(changes.subscribe())
+            defer {
+                // Start iteration so onTermination fires when scope exits
+                Task { [subscription] in
+                    var sub = subscription
+                    _ = await sub.stream.makeAsyncIterator().next()
+                }
+            }
             let result = try await query()
             switch try await evaluate(result) {
             case let .finished(value):
                 return value
             case let .waitingUntil(deadline):
-                try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval)
+                await subscription.consume()
+                try await waitForChange(on: subscription.stream, until: deadline, backstopInterval: backstopInterval)
             }
         }
     }

Alternatively, add explicit unsubscribe support to Topic (e.g., subscribe() -> (stream, unsubscribe: () -> Void)).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/IMessage/Sources/IMessage/DatabaseTickWaits.swift` around lines 127 -
143, waitForDatabaseResult creates a subscription via changes.subscribe() (bound
to changeStream) but returns immediately when evaluate(...) yields .finished,
leaving the AsyncStream continuation in Topic.subscriptions and leaking; fix by
introducing an RAII-style subscription guard (e.g., SubscriptionGuard) that
wraps changes.subscribe(), exposes the stream (e.g., guard.stream) and
guarantees on deinit or explicit close() that the underlying subscription is
cancelled/consumed, then use this guard in waitForDatabaseResult instead of a
raw changeStream so that when evaluate returns .finished the guard is
dropped/closed and the subscription is removed; ensure waitForChange continues
to accept guard.stream and that guard has an explicit close() called before
returning the finished value if necessary.


private static func waitForChange(on stream: AsyncStream<Void>, until deadline: Date, backstopInterval: TimeInterval) async throws -> Bool {
let remainingTime = deadline.timeIntervalSinceNow
guard remainingTime > 0 else { return false }

Comment thread
coderabbitai[bot] marked this conversation as resolved.
let sleepTime = min(remainingTime, backstopInterval)

return try await withThrowingTaskGroup(of: Bool.self) { group in
group.addTask {
var iterator = stream.makeAsyncIterator()
return await iterator.next() != nil
}
group.addTask {
try await Task.sleep(forTimeInterval: sleepTime)
return false
}

do {
let changed = try await group.next() ?? false
group.cancelAll()
return changed
} catch {
group.cancelAll()
throw error
}
}
}
}
165 changes: 90 additions & 75 deletions src/IMessage/Sources/IMessage/PlatformAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,72 @@ private let messagePageLimit = 20
private let platformLog = Logger(imessageLabel: "platform-api")
private let messageSendTimeout: TimeInterval = 45
private let reactionSendTimeout: TimeInterval = 5
private let waitForLinksTimeout: TimeInterval = 1.5
private let waitForSentThreadTimeout: TimeInterval = 10
private let sentMessagePollInterval: TimeInterval = 0.025
private let loadAttachmentTimeout: TimeInterval = 60
private let loadAttachmentInitialPollInterval: TimeInterval = 0.25
private let loadAttachmentMaxPollInterval: TimeInterval = 5

private final class PlatformAPIDatabase: @unchecked Sendable {
private let database = Protected<IMDatabase?>()
private let state = Protected(State())

func withDatabase<T>(_ action: (IMDatabase) throws -> T) throws -> T {
try database.withLock { cachedDatabase in
if let cachedDatabase {
return try action(cachedDatabase)
}
try state.withLock { state in
try action(try ensureDatabase(&state))
}
}

func changeTopic() throws -> Topic<Void> {
// Snapshot the db reference (and whether setup is still pending) under
// the lock, then run the external setup call OUTSIDE the lock:
// beginListeningForChanges opens FDs, creates an FSEventStream, and
// spawns a Task, none of which should run while we hold the lock.
let (db, shouldSetUp) = try state.withLock { state -> (IMDatabase, Bool) in
let db = try ensureDatabase(&state)
return (db, state.listening == .notStarted)
}

guard shouldSetUp else {
return db.changes
}

// Two callers can both observe `.notStarted` in a rare window and each
// run setup once. beginListeningForChanges is idempotent (it tears down
// any prior watchers/debouncer first), so at most one extra setup in
// that window is harmless; steady state never double-spawns watchers.
Comment thread
indent[bot] marked this conversation as resolved.
Outdated
do {
try db.beginListeningForChanges()
state.withLock { $0.listening = .listening }
} catch {
// Policy: log once and accept backstop-degraded mode. The topic just
// won't tick, but the 1.0s backstop in DatabaseTickWaits carries
// correctness at poll cadence. We deliberately do NOT retry setup on
// every subsequent call (no backoff) to avoid log/FD churn on a
// persistent failure.
state.withLock { $0.listening = .failed }
platformLog.error("failed to begin listening for database changes, falling back to backstop polling: \(error)")
}
return db.changes
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

let newDatabase = try IMDatabase(createIndexes: true)
cachedDatabase = newDatabase
return try action(newDatabase)
/// Lazily opens the process-wide database, caching it on `state`.
private func ensureDatabase(_ state: inout State) throws -> IMDatabase {
if let cached = state.database {
return cached
}
let db = try IMDatabase(createIndexes: true)
state.database = db
return db
}

/// Tracks change-listener setup so we set it up once and don't re-run it on
/// every wait after a persistent failure.
private enum ListeningState {
case notStarted
case listening
case failed
}

private struct State {
var database: IMDatabase?
var listening: ListeningState = .notStarted
}
}

Expand Down Expand Up @@ -410,36 +456,18 @@ public final class PlatformAPI {
messageID: String,
timeout: TimeInterval
) async throws -> PlatformSDK.Message {
let deadline = Date().addingTimeInterval(timeout)
var pollInterval = loadAttachmentInitialPollInterval
var isFirstRead = true

while true {
let message = try await getMessage(threadID: threadID, messageID: messageID)
.orThrow(ErrorMessage("Could not find message \(messageID)"))
let attachments = message.attachments ?? []
if isFirstRead {
guard !attachments.isEmpty else {
throw ErrorMessage("Message \(messageID) has no attachments")
}
isFirstRead = false
}
if !attachments.isEmpty, !attachments.contains(where: { $0.loading == true }) {
return message
}

if let failureState = try await terminalAttachmentFailureState(messageID: messageID) {
throw ErrorMessage("Attachment in message \(messageID) failed to load (transfer state: \(failureState.rawValue))")
}

let remainingTime = deadline.timeIntervalSinceNow
guard remainingTime > 0 else {
throw ErrorMessage("Timed out waiting for attachment in message \(messageID) to load")
let changes = try database.changeTopic()
return try await DatabaseTickWaits.loadedAttachment(
messageID: messageID,
timeout: timeout,
changes: changes,
loadMessage: {
try await self.getMessage(threadID: threadID, messageID: messageID)
},
terminalAttachmentFailureState: {
try await self.terminalAttachmentFailureState(messageID: messageID)
}

try await Task.sleep(forTimeInterval: min(pollInterval, remainingTime))
pollInterval = min(pollInterval * 2, loadAttachmentMaxPollInterval)
}
)
}

/// Returns the first attachment state on `messageID` that is terminal failure
Expand Down Expand Up @@ -764,47 +792,34 @@ public final class PlatformAPI {
timeout: TimeInterval
) async throws -> [(rowID: Int, guid: String)] {
let database = database
return try await Task.detached(priority: .userInitiated) {
let start = Date()
let expectedNewMessageIDCount = text.map { max($0.linkCount, 1) } ?? 1
var sentMessageIDs: [(rowID: Int, guid: String)] = []
while sentMessageIDs.count != expectedNewMessageIDCount {
sentMessageIDs = try database.withDatabase { db in
try db.sentMessageIDs(since: lastRowID)
}
if text != nil, !sentMessageIDs.isEmpty, Date().timeIntervalSince(start) > waitForLinksTimeout {
break
}
if Date().timeIntervalSince(start) > timeout {
throw ErrorMessage("timed out waiting for sent messages")
}
try await Task.sleep(forTimeInterval: sentMessagePollInterval)
let changes = try database.changeTopic()
// Not via Task.detached, so caller cancellation propagates and the wait
// stops promptly instead of running to its own timeout.
return try await DatabaseTickWaits.sentMessageIDs(
text: text,
timeout: timeout,
changes: changes
) {
try database.withDatabase { db in
try db.sentMessageIDs(since: lastRowID)
}
return sentMessageIDs
}.value
}
}

private func waitForSentThreadIDs(messageRowIDs: [Int]) async throws -> [String?] {
let database = database
return try await Task.detached(priority: .userInitiated) {
let sentThreadIDs = {
try messageRowIDs.map { rowID in
try database.withDatabase { db in
try db.threadIDForMessage(rowID: rowID)
}
}
}
var threadIDs = try sentThreadIDs()
let start = Date()
while threadIDs.contains(where: { $0 == nil }) {
try await Task.sleep(forTimeInterval: sentMessagePollInterval)
threadIDs = try sentThreadIDs()
if Date().timeIntervalSince(start) > waitForSentThreadTimeout {
break
let changes = try database.changeTopic()
// Not via Task.detached, so caller cancellation propagates.
return try await DatabaseTickWaits.sentThreadIDs(
timeout: waitForSentThreadTimeout,
changes: changes
) {
try messageRowIDs.map { rowID in
try database.withDatabase { db in
try db.threadIDForMessage(rowID: rowID)
}
}
return threadIDs
}.value
}
}

private func sentMessages(_ sentMessageIDs: [(rowID: Int, guid: String)]) async throws -> [PlatformSDK.Message] {
Expand Down
Loading