feat(replication): add replicationState.awaitDocumentPushed()#8688
feat(replication): add replicationState.awaitDocumentPushed()#8688pubkey wants to merge 8 commits into
Conversation
Adds awaitDocumentPushed(rxDocument) to the replication state which returns a promise that resolves once the given RxDocument state was successfully pushed to the server. It compares the documents _meta.lwt with the upstream (push) checkpoint to know when that version of the document reached the master. - new error code RC_PUSH_AWAIT for pull-only replications - tests for resolve-after-push, resolve-immediately, not-resolve-before-push and the pull-only throw case - docs section including a Promise.race() timeout example #8632 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| `awaitDocumentPushed()` does not set a timeout on purpose. If you need one, combine it with `Promise.race()`: | ||
|
|
||
| ```ts | ||
| function timeout(ms: number) { |
There was a problem hiding this comment.
@copilot inline this to make the example shorter
✅ Verify Test Reproduction: Tests FAILED without the fix (expected)This confirms the changed tests correctly reproduce the bug that the source changes fix. This workflow runs the changed tests without the source fix to verify they reproduce the bug. Show output |
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| // here we know that the document state was pushed to the server | ||
| ``` | ||
|
|
||
| It works by comparing the documents internal write time (`_meta.lwt`) with the push checkpoint. The push checkpoint moves forward with each successful push, so once it covers the documents write time, RxDB knows that this version of the document has been replicated to the master. |
There was a problem hiding this comment.
explain that another older or newer RxDocument instance of that document is not the exact same RxDcoument. a RxDocument represents the state of the document in a given point in time @copilot
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| * document (or any other document with a higher write time) has been | ||
| * pushed, because that also proves the given state reached the server. | ||
| * | ||
| * Notice that this does NOT set a timeout. If you need one, combine it |
There was a problem hiding this comment.
@copilot here we do not need the hint about timeout
| await this.startPromise; | ||
| const internalReplicationState = ensureNotFalsy(this.internalReplicationState); | ||
| const primaryPath = this.collection.schema.primaryPath; | ||
| const docId: string = (doc._data as any)[primaryPath]; |
| } | ||
|
|
||
| return new Promise<void>((resolve, reject) => { | ||
| const sub = internalReplicationState.events.active.up.subscribe((active: boolean) => { |
There was a problem hiding this comment.
@copilot shouldnt we use myRxReplicationState.sent$ here?
There was a problem hiding this comment.
Done in 03ff70e, now triggered by sent$.
One note: sent$ is used as the trigger, but I kept the isDocumentStateOlderThenCheckpoint() comparison as the actual resolve condition instead of resolving directly on the emitted document. Two reasons:
sent$emitsWithDeleted<RxDocType>without_meta, so it does not carry the_meta.lwtneeded to match the exact instance version.sent$fires before the push checkpoint is persisted, and matching only by primary key could resolve on an older in-flight push of the same document. Comparing against the checkpoint (afterawait streamQueue.up) keeps the point-in-time precision.
The same commit also renames the helper to isDocumentStateOlderThenCheckpoint and adds the test where the insert happens before the replication state is created.
Generated by Claude Code
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| * A document state is considered pushed when it is NOT after the checkpoint, | ||
| * which mirrors the comparison used by getChangedDocumentsSinceQuery(). | ||
| */ | ||
| export function isDocumentStatePushedToMaster( |
There was a problem hiding this comment.
@copilot call thsi function isDocumentStateOlderThenCheckpoint
| }); | ||
| }); | ||
| describe('.awaitDocumentPushed()', () => { | ||
| it('should resolve after the document was pushed to the master', async () => { |
There was a problem hiding this comment.
@copilot add test where the insert happens before the replication state is created
… helper - re-check the push checkpoint on each sent$ emission instead of the internal active.up observable - rename helper to isDocumentStateOlderThenCheckpoint - add test where the insert happens before the replication state is created Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| ]); | ||
| ``` | ||
|
|
||
| Calling `awaitDocumentPushed()` on a replication that has no `push` handler (pull-only) throws an error, because such documents are never sent to the server. |
There was a problem hiding this comment.
we do not need that hint here in the docs
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
| * pushed state. The checkpoint is written at the end of a push | ||
| * cycle, so we must not read it while a cycle is still in progress. | ||
| */ | ||
| await internalReplicationState.streamQueue.up; |
There was a problem hiding this comment.
i do not think we should await this here, before isPushed is called another cycled could have started already that does not affect our doucment anyways.
There was a problem hiding this comment.
Removed in b5dc0f0. You are right.
I dropped the streamQueue.up await entirely and now resolve directly on the sent$ emission whose primary key matches the document. That emission already proves the document reached the master, so there is no need to wait for any cycle or re-read the checkpoint. The isDocumentStateOlderThenCheckpoint() comparison is now only used for the initial check (documents that were already pushed before awaitDocumentPushed() was called).
Generated by Claude Code
Drop the await on streamQueue.up. A sent$ emission with the document primary key already proves the document reached the master, so we resolve on it directly. The checkpoint comparison is now only used for the initial check of documents that were pushed before awaitDocumentPushed() was called. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01W7jgHSwWQBwmDBdagfUYdi
Summary
Implements the feature requested in #8632: a per-document push acknowledgement on the replication state.
replicationState.awaitDocumentPushed(rxDocument)returns a promise that resolves once the givenRxDocumentinstance was successfully pushed to the server.How it works
Each
RxDocumentcarries its write time in_meta.lwt. The upstream (push) checkpoint has the default RxStorage checkpoint shape{ id, lwt }and moves forward with every successful push, in the sort order[_meta.lwt ASC, primaryKey ASC]. A document state is considered pushed once it is no longer "after" the checkpoint, which mirrors the comparison used bygetChangedDocumentsSinceQuery().The implementation:
sent$emission (when a document is sent) and when the replication becomes idle (active$→false), at which point the checkpoint for that push cycle has been writtenNo timeout is built in by design. The docs show how to combine it with
Promise.race().Behavior notes
RC_PUSH_AWAIT.awaitInitialReplication().Changes
src/plugins/replication/index.ts: newawaitDocumentPushed()method andisDocumentStateOlderThenCheckpoint()helpersrc/plugins/dev-mode/error-messages.ts: new error codeRC_PUSH_AWAITtest/unit/replication.test.ts: tests for resolve-after-push, resolve-immediately-if-already-pushed, not-resolve-before-push, document-inserted-before-replication, and the pull-only throwdocs-src/docs/replication.md: newawaitDocumentPushed()section with aPromise.race()timeout exampleCloses #8632
🤖 Generated with Claude Code
Generated by Claude Code