Replace polling with database-tick waits in PlatformAPI#88
Conversation
…ellation, tests - waitForChange caps each wait at a ~1s backstop so a dropped/coalesced FS tick re-queries at poll cadence instead of hanging to the full timeout - changeTopic() degrades gracefully when beginListeningForChanges() throws: log and return the (silent) topic; the backstop carries the wait - sentMessageIDs/sentThreadIDs waits run inline instead of Task.detached so caller cancellation propagates and they stop promptly - extract ensureDatabase(_:) to dedupe lazy DB init - add regression tests for the timeout/error paths in all three helpers and a finishCurrentSubscribers deadlock guard Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplace polling loops with tick/backstop-driven waits: Topic subscriptions use UUID-keyed continuations; DatabaseTickWaits provides sentMessageIDs, sentThreadIDs, loadedAttachment, and waitForChange; PlatformAPI and IMDatabase integrate these changes; tests cover success, timeout, cancellation, and failure modes. ChangesTick-driven database waiting system
🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
This PR replaces several fixed-interval polling loops in PlatformAPI with event-driven waits that block on IMDatabase.changes ticks (with a ~1s backstop recheck), reducing idle SQL querying while preserving similar detection latency.
Changes:
- Introduces
DatabaseTickWaitsto implement tick-driven “subscribe → query → waitForChange(or backstop) → retry” wait loops for sent-message IDs, sent-thread IDs, and attachment loading. - Updates
PlatformAPIto useDatabaseTickWaitsand addsPlatformAPIDatabase.changeTopic()+ensureDatabase(_:)for lazy DB init and change-topic setup (with graceful fallback if change listening fails). - Enhances
Topicto clean up terminated subscriptions (UUID-keyed) and avoid lock reentrancy issues by movingyield()/finish()outside theProtectedlock; adds focused tests for these behaviors.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift | Adds coverage for tick-driven re-query semantics, timeout/partial-return paths, and Topic subscription cleanup/deadlock regression checks. |
| src/IMessage/Sources/IMessageCore/Topic.swift | Adds on-termination subscription cleanup and changes broadcast/finish to operate outside the lock to avoid deadlocks. |
| src/IMessage/Sources/IMessage/PlatformAPI.swift | Replaces polling-based waits with DatabaseTickWaits and adds change-topic acquisition with graceful fallback behavior. |
| src/IMessage/Sources/IMessage/DatabaseTickWaits.swift | New helper implementing tick-based waits with a bounded backstop interval to avoid hanging on missed/coalesced ticks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Return early when state.database is already set so cache hits don't re-store the same reference under the lock. Generated with [Indent](https://indent.com) Co-Authored-By: KishanBagaria <KishanBagaria@users.noreply.github.com>
…setup, off-lock changeTopic - test the 1.0s backstop re-query path with no FS tick (injectable backstop interval) - test caller-cancellation propagation through DatabaseTickWaits (prompt throw + subscription cleanup) - test loadedAttachment's message-not-found throw path - changeTopic(): run beginListeningForChanges() outside the DB lock; re-acquire only to record state - replace isListeningForChanges bool with ListeningState (notStarted/listening/failed): log once and accept backstop-degraded mode instead of retrying setup on every send - make beginListeningForChanges() idempotent: cancel the prior debouncer Task and tear down partial watcher state on throw Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/IMessage/Sources/IMDatabase/Database/IMDatabase.swift`:
- Around line 111-123: The FSEventsWatcher is currently a local variable named
directoryWatcher inside beginListeningForChanges(), so IMDatabase does not
retain it and events or teardown fail; make FSEventsWatcher a stored property on
IMDatabase (alongside fileWatchers), assign the newly created watcher to that
property instead of only the local directoryWatcher, update
cleanupPartialSetup() and any other teardown paths (including deinit and the
other beginListeningForChanges() block around lines 133-163) to stop/invalidate
and nil out self.directoryWatcher when cleaning up or replacing it, and ensure
the method uses the stored property for lifecycle management rather than a local
variable.
In `@src/IMessage/Sources/IMessage/PlatformAPI.swift`:
- Around line 23-53: The race comes from two callers both observing listening ==
.notStarted and each calling db.beginListeningForChanges(), which mutates shared
IMDatabase state; fix it by reserving the setup slot under the lock: inside
changeTopic() use state.withLock to change listening from .notStarted to a
transient .starting (or similar) and return (db, shouldSetUp) accordingly so
only the thread that successfully transitions .notStarted -> .starting performs
beginListeningForChanges(); other callers seeing .starting should skip calling
db.beginListeningForChanges() and just return db.changes. After a successful
beginListeningForChanges() set listening = .listening under the lock, and on
error set listening = .failed (as before), ensuring ensureDatabase, changeTopic,
state.withLock, beginListeningForChanges, and the listening state transitions
are the only symbols you change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 28357d31-89cf-45b6-b457-03a7008a2c7e
📒 Files selected for processing (4)
src/IMessage/Sources/IMDatabase/Database/IMDatabase.swiftsrc/IMessage/Sources/IMessage/DatabaseTickWaits.swiftsrc/IMessage/Sources/IMessage/PlatformAPI.swiftsrc/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift
📜 Review details
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-05-03T17:00:19.662Z
Learnt from: KishanBagaria
Repo: beeper/platform-imessage PR: 69
File: src/IMessage/Sources/IMessage/EventWatcher/EventWatcher+Updates.swift:89-93
Timestamp: 2026-05-03T17:00:19.662Z
Learning: In the beeper/platform-imessage Swift codebase, keep message IDs (`PlatformSDK.MessageID`) as raw GUIDs. When mapping from DB/event rows to `message.id`, set the ID directly from `msgRow.guid` (no GUID→public-ID hashing or transformation). For multi-part messages, append the part index as `_<part.index>` to the GUID-derived ID. During code review, if changes touch message ID creation/mapping, ensure this raw GUID + optional `_<part.index>` suffix behavior is preserved.
Applied to files:
src/IMessage/Sources/IMDatabase/Database/IMDatabase.swiftsrc/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swiftsrc/IMessage/Sources/IMessage/DatabaseTickWaits.swiftsrc/IMessage/Sources/IMessage/PlatformAPI.swift
🪛 SwiftLint (0.63.2)
src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift
[Warning] 54-54: Attributes should be on their own lines in functions and types, but on the same line as variables and imports
(attributes)
[Warning] 83-83: Attributes should be on their own lines in functions and types, but on the same line as variables and imports
(attributes)
[Warning] 234-234: Attributes should be on their own lines in functions and types, but on the same line as variables and imports
(attributes)
changeTopic() ran beginListeningForChanges() outside the state lock and let two callers both observe .notStarted, so two threads could concurrently mutate IMDatabase's unsynchronized fileWatchers array and debouncer. Claim the setup slot atomically (.notStarted -> .starting) under the lock so only one caller runs setup; the loser returns the topic and rides the backstop. Also assign directoryWatcherOut only after start() succeeds, so the partial-setup cleanup can't stop/invalidate a never-started FSEventStream. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Retain the directory watcher so repeated listener setup can stop it alongside file watchers and the debouncer. Serialize listener setup and filesystem callback refreshes to avoid racing watcher state. Co-Authored-By: Codex <codex@openai.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/IMessage/Sources/IMessage/DatabaseTickWaits.swift`:
- Around line 145-148: The early return in
waitForChange(on:until:backstopInterval:) when remainingTime <= 0 leaves the
AsyncStream subscription dangling; change the implementation so that instead of
returning immediately it consumes a single element from the provided
AsyncStream<Void> (i.e., await one tick from the stream) before returning to
ensure the subscription is completed, or alternatively add explicit
cancellation/unsubscription logic at the caller (waitForDatabaseResult) if you
prefer to handle teardown there; locate waitForChange and modify the early-exit
path to await a single stream element (or call the caller-level unsubscription)
so the AsyncStream subscription is not leaked.
- Around line 127-143: waitForDatabaseResult creates a subscription via
changes.subscribe() (bound to changeStream) but returns immediately when
evaluate(...) yields .finished, leaving the AsyncStream continuation in
Topic.subscriptions and leaking; fix by introducing an RAII-style subscription
guard (e.g., SubscriptionGuard) that wraps changes.subscribe(), exposes the
stream (e.g., guard.stream) and guarantees on deinit or explicit close() that
the underlying subscription is cancelled/consumed, then use this guard in
waitForDatabaseResult instead of a raw changeStream so that when evaluate
returns .finished the guard is dropped/closed and the subscription is removed;
ensure waitForChange continues to accept guard.stream and that guard has an
explicit close() called before returning the finished value if necessary.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9212785a-df21-47a1-884a-76f33874d373
📒 Files selected for processing (3)
src/IMessage/Sources/IMessage/DatabaseTickWaits.swiftsrc/IMessage/Sources/IMessageCore/Topic.swiftsrc/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift
📜 Review details
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-05-03T17:00:19.662Z
Learnt from: KishanBagaria
Repo: beeper/platform-imessage PR: 69
File: src/IMessage/Sources/IMessage/EventWatcher/EventWatcher+Updates.swift:89-93
Timestamp: 2026-05-03T17:00:19.662Z
Learning: In the beeper/platform-imessage Swift codebase, keep message IDs (`PlatformSDK.MessageID`) as raw GUIDs. When mapping from DB/event rows to `message.id`, set the ID directly from `msgRow.guid` (no GUID→public-ID hashing or transformation). For multi-part messages, append the part index as `_<part.index>` to the GUID-derived ID. During code review, if changes touch message ID creation/mapping, ensure this raw GUID + optional `_<part.index>` suffix behavior is preserved.
Applied to files:
src/IMessage/Sources/IMessageCore/Topic.swiftsrc/IMessage/Sources/IMessage/DatabaseTickWaits.swiftsrc/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift
🔇 Additional comments (6)
src/IMessage/Sources/IMessageTests/DatabaseTickWaitTests.swift (1)
1-325: LGTM!src/IMessage/Sources/IMessageCore/Topic.swift (1)
7-7: LGTM!Also applies to: 17-24, 26-39, 50-58
src/IMessage/Sources/IMessage/DatabaseTickWaits.swift (4)
20-59: LGTM!
61-82: LGTM!
84-125: LGTM!
151-167: LGTM!
| private static func waitForDatabaseResult<T>( | ||
| changes: Topic<Void>, | ||
| backstopInterval: TimeInterval, | ||
| query: @escaping @Sendable () async throws -> T, | ||
| evaluate: (T) async throws -> WaitResult<T> | ||
| ) async throws -> T { | ||
| while true { | ||
| let changeStream = changes.subscribe() | ||
| let result = try await query() | ||
| switch try await evaluate(result) { | ||
| case let .finished(value): | ||
| return value | ||
| case let .waitingUntil(deadline): | ||
| try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Subscription leak when query succeeds without waiting.
When evaluate returns .finished on the first query attempt, the subscription created at line 134 is never iterated—waitForChange is skipped. Since AsyncStream.onTermination only fires when the stream is iterated, finished, or the iterating task is cancelled, the continuation remains in Topic.subscriptions indefinitely.
Over time, these dangling subscriptions accumulate: each broadcast() will yield to orphaned continuations with .unbounded buffering, causing unbounded memory growth.
Proposed fix: Wrap stream in RAII-style cleanup
Introduce a small wrapper that ensures the stream is consumed/cancelled on scope exit:
+ private struct ScopedSubscription {
+ let stream: AsyncStream<Void>
+ private var iterator: AsyncStream<Void>.AsyncIterator?
+
+ init(_ stream: AsyncStream<Void>) {
+ self.stream = stream
+ }
+
+ mutating func consume() async {
+ if iterator == nil {
+ iterator = stream.makeAsyncIterator()
+ }
+ _ = await iterator?.next()
+ }
+ }
+
private static func waitForDatabaseResult<T>(
changes: Topic<Void>,
backstopInterval: TimeInterval,
query: `@escaping` `@Sendable` () async throws -> T,
evaluate: (T) async throws -> WaitResult<T>
) async throws -> T {
while true {
- let changeStream = changes.subscribe()
+ var subscription = ScopedSubscription(changes.subscribe())
+ defer {
+ // Start iteration so onTermination fires when scope exits
+ Task { [subscription] in
+ var sub = subscription
+ _ = await sub.stream.makeAsyncIterator().next()
+ }
+ }
let result = try await query()
switch try await evaluate(result) {
case let .finished(value):
return value
case let .waitingUntil(deadline):
- try await waitForChange(on: changeStream, until: deadline, backstopInterval: backstopInterval)
+ await subscription.consume()
+ try await waitForChange(on: subscription.stream, until: deadline, backstopInterval: backstopInterval)
}
}
}Alternatively, add explicit unsubscribe support to Topic (e.g., subscribe() -> (stream, unsubscribe: () -> Void)).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/IMessage/Sources/IMessage/DatabaseTickWaits.swift` around lines 127 -
143, waitForDatabaseResult creates a subscription via changes.subscribe() (bound
to changeStream) but returns immediately when evaluate(...) yields .finished,
leaving the AsyncStream continuation in Topic.subscriptions and leaking; fix by
introducing an RAII-style subscription guard (e.g., SubscriptionGuard) that
wraps changes.subscribe(), exposes the stream (e.g., guard.stream) and
guarantees on deinit or explicit close() that the underlying subscription is
cancelled/consumed, then use this guard in waitForDatabaseResult instead of a
raw changeStream so that when evaluate returns .finished the guard is
dropped/closed and the subscription is removed; ensure waitForChange continues
to accept guard.stream and that guard has an explicit close() called before
returning the finished value if necessary.
Co-Authored-By: Codex <codex@openai.com>
What
Replaces the fixed-interval polling loops in
PlatformAPI(25ms / 250ms / exponential-backoffTask.sleep) for sent-message, sent-thread, and attachment-load waits with event-driven waits on the existingIMDatabase.changesTopic<Void>— the same debounced (25ms) filesystem-watcher tick the rest of the system already uses. Idle waits stop issuing wasted SQL queries; detection latency matches the old 25ms cadence.New
DatabaseTickWaitshelper holds the three wait functions; each loopssubscribe → query → satisfied? return : waitForChange(tick OR deadline). The subscribe-before-query ordering avoids lost wakeups.TopicgainsonTermination-based subscription cleanup (keyed byUUID) and movesyield()/finish()out of the non-reentrantos_unfair_lockcritical section.Hardening (from eng review)
waitForChangecaps each wait at ~1s, so a dropped/coalesced FS tick re-queries at poll cadence instead of hanging to the full timeout (restores the old pollers' self-healing).changeTopic()no longer fails the wait ifbeginListeningForChanges()throws — it logs and returns the (silent) topic; the backstop carries the wait.Task.detached, so caller cancellation propagates and they stop promptly.ensureDatabase(_:)for lazy DB init.Tests
DatabaseTickWaitTests— 11/11 passing:Topicterminated-subscription cleanup +finishCurrentSubscribersdeadlock guard🤖 Generated with Claude Code