[Draft][Durability] Sync replication #3704
Conversation
Move the expensive AOF write+fsync off the main thread when IO threads are available. This prevents the main thread from blocking on disk I/O when appendfsync is set to 'always'. Add a generic trySendJobToIOThreads() API to io_threads with round-robin distribution, and an aof IO flush state machine (IDLE/PENDING/DONE/ERR) with atomic coordination between main and IO threads. The adjustIOThreadsByEventLoad() function gains a has_background_work parameter to ensure IO threads stay active when AOF fsync work is pending, even during low-traffic periods. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Introduce a provider registry that allows multiple durability backends (AOF fsync, replicas, etc.) to register and contribute to a consensus offset. The overall durability consensus is the MIN (AND) of all enabled providers' acknowledged offsets. Include the built-in AOF provider that tracks fsynced_reploff_pending when appendfsync=always, and transparently passes through when not. Add pause/resume support for providers (used via DEBUG commands) to enable deterministic testing by freezing a provider's acknowledged offset at a point-in-time snapshot. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add a task registry that defers side-effects (keyspace notifications, key invalidations, flush invalidations) until durability providers acknowledge the associated write offset. Each task type registers create/destroy/execute/onClientDestroy handlers. Tasks are created during command execution with a deferred offset, then moved to an official waiting list once the replication offset is known. When the consensus offset advances past a task's offset, the task is executed and freed. Key invalidation tasks track the originating client pointer and properly handle client disconnection before task execution. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Track which keys have been modified but not yet acknowledged by durability providers using a per-database hashtable. This enables rejecting reads of uncommitted keys to ensure clients only see durable data (zero-data-loss semantics). Each uncommitted key stores the replication offset at which it was last modified. Keys are purged when the durability consensus offset advances past their stored offset. Include incremental cleanup via serverCron that scans databases round-robin with a configurable time limit, plus immediate purging on read access (lazy cleanup). Also handle database-level modifications (FLUSHDB, FLUSHALL, SWAPDB) and function store dirty tracking for transactions. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add the core orchestration layer that blocks client responses in the client output buffer (COB) until durability providers confirm the write offset, then unblocks and flushes responses to clients. reply_blocking.c/h contains: - durabilityInit/Cleanup/Reset lifecycle management - beforeCommandTrackReplOffset/afterCommandTrackReplOffset for tracking which replication offsets each command produces - preCommandExec: rejects commands accessing uncommitted keys - postCommandExec: blocks client responses until providers acknowledge - notifyDurabilityProgress: called from beforeSleep to unblock clients whose offsets have been acknowledged - blockClientOnReplOffset/unblockResponsesWithAckOffset - Function store dirty tracking for FUNCTION LOAD/DELETE - INFO durability stats generation Integration points across the server: - server.c: init/cleanup in server lifecycle, pre/post command hooks in call() and processCommand(), notifyDurabilityProgress in beforeSleep, uncommitted keys cleanup in serverCron, per-DB init, INFO section - server.h: durable_t in server struct, clientDurabilityInfo in client, uncommitted_keys/dirty_repl_offset in serverDb, new client flag - config.c: 'durability' bool config with dynamic update callback - db.c: durabilitySignalModifiedKey/durabilitySignalFlushedDb hooks - networking.c: client durability init/reset, COB reply limiting - notify.c: defer keyspace notifications when durability is enabled - script.c/module.c: pre-script checks for uncommitted data access - replication.c: clear durability state on primary change - debug.c: durability-provider-pause/resume DEBUG subcommands - object.c: getIntFromObject utility Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add reply_blocking.c, durable_task.c, durability_provider.c, and uncommitted_keys.c to the build system (both Makefile and CMake). Also fix a clang compatibility issue in unit test CMakeLists.txt: -fno-var-tracking-assignments is GCC-only, so guard it with a compiler ID check. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add comprehensive gtest-based unit tests covering the reply blocking subsystem including: - Client output buffer blocking and unblocking mechanics - Offset tracking through command execution - Multi-command transaction (MULTI/EXEC) offset handling - Durability provider consensus calculations - Deferred task lifecycle (create, execute, cleanup) - Uncommitted key tracking and purging - Edge cases: client disconnection, provider pause/resume Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Add Tcl-based integration tests (1,051 lines) covering end-to-end durability behavior including: - AOF-based response blocking with appendfsync=always - Provider pause/resume via DEBUG commands for deterministic testing - Uncommitted key rejection (reads return error for dirty keys) - MULTI/EXEC transaction durability semantics - Lua script and FCALL durability checks - Function store (FUNCTION LOAD/DELETE) durability blocking - Client disconnection during blocked state - Multiple concurrent clients with interleaved blocking/unblocking - INFO durability stats verification Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Remove the standalone 'durability' / 'sync-replication' bool config. Durability is now implied by the combination of appendonly + appendfsync always — there is no need for a separate knob. - isDurabilityEnabled() now checks aof_state != AOF_OFF && aof_fsync == AOF_FSYNC_ALWAYS - Removed 'enabled' field from durable_t struct - AOF durability provider isEnabled() now requires appendfsync always (instead of a pass-through returning primary_repl_offset) - updateAppendFsync() and updateAppendOnly() call durabilityReset() - Updated integration and unit tests to use appendfsync always/everysec Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
… queue Separate the two concerns in uncommitted key tracking: 1. Is a key dirty? Mark it immediately when mutated, even inside MULTI/EXEC (using LLONG_MAX as placeholder offset so reads block). 2. When can we clean it? Use a FIFO offset tracker queue in the durability system. After a transaction completes, update the placeholder to the real offset and enqueue for cleanup. drainCommittedKeys() pops from the queue head when the committed offset advances, replacing the old periodic scan-based clearUncommittedKeysAcknowledged() cron job entirely. - Removed keys_cleanup_time_limit_ms and curr_db_scan_idx from durable_t - Removed getUncommittedKeysCleanupTimeLimit() - drainCommittedKeys() called from notifyDurabilityProgress() - Removed periodic clearUncommittedKeysAcknowledged() from serverCron Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
isDurabilityEnabled() no longer hardcodes the AOF check. It delegates to anyDurabilityProviderEnabled(), so any registered provider (including future non-AOF providers) automatically enables the durability system. The AOF-specific check stays in aofProviderIsEnabled() where it belongs. Also fixes UncommittedKeysTest SIGSEGV by initializing the offset tracker queue in the test fixture. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
durable_task.c: Copy the event string (argv[1]) in createKeyspaceNotifyTask() using zstrdup() instead of storing the raw pointer. Module-triggered keyspace notifications can free the event string after the call returns, leaving a dangling pointer. The copy is freed in destroyKeyspaceNotifyTask(). Tests added: - Unit: KeyspaceNotifyTaskCopiesEventString verifies the task survives after the original event string is freed. - Tcl: 'Pipelined non-blocking then blocking command does not leak blocked reply' verifies no blocked bytes leak to the client when PING+SET are pipelined while the provider is paused. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Remove redundant includes of reply_blocking.h, durable_task.h, durability_provider.h, and uncommitted_keys.h before server.h in the durability .c files. These headers include <sys/types.h> which, when included before fmacros.h (via server.h), causes off_t to be 32-bit on 32-bit builds, failing the static_assert. Also replace sizeof(long long)==sizeof(void*) static_assert with intptr_t casts in durable_task.c, since long long is 8 bytes but void* is 4 bytes on 32-bit targets. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
…parison In clientHasPendingReplies(), the comparison of io_last_written.data_len with n->disallowed_byte_offset is incompatible with copy avoidance. data_len tracks decoded RESP bytes written to the socket while disallowed_byte_offset is recorded in encoded buffer coordinates. After a partial write by IO threads, data_len (e.g. 1035) can exceed the encoded boundary (e.g. 32) causing the server to incorrectly report no pending replies and release the response prematurely. Replace data_len with bufpos which tracks the raw encoded buffer position and is directly comparable with disallowed_byte_offset. Added unit test (ClientHasPendingRepliesUsesBufposNotDataLen) that deterministically sets up the post-partial-write state where bufpos < boundary but data_len > boundary, verifying the fix. Added DEBUG set-io-last-written command and TCL integration test that uses it to inject divergent io_last_written state on a blocked client, exercising the clientHasPendingReplies comparison with copy avoidance. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
…eftovers - Remove incorrect auto-generation comment from src/unit/test_files.h referencing non-existent utils/generate-unit-test-header.py script. - Add #include "fmacros.h" to reply_blocking.h and durable_task.h before system headers so _FILE_OFFSET_BITS=64 is defined early, fixing the static_assert(sizeof(off_t) >= 8) failure on 32-bit builds. - Remove shouldRejectCommandWithUncommittedData() and its call chain (isAccessingUncommittedData, isSingleCommandAccessingUncommittedKeys, selectGetParams, DURABILITY_DATA_UNAVAILABLE) which were leftovers from the sync-replication design where a demoted primary would reject reads on uncommitted data. This is not relevant to AOF durability. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
- Replace getKeysFromCommand with getKeysUsingKeySpecs in afterCommandTrackReplOffset to avoid calling setGetKeys on argv rewritten during command execution (EX->PXAT) - Remove redundant AOF_FSYNC_ALWAYS check and unreachable error path in processAofIOThreadFlushResult - Add explicit aof_io_flush_state==IDLE precondition check in tryOffloadAofAlwaysFlushToIOThreads - Defer aof_buf reset until after trySendJobToIOThreads succeeds - Add 30-second timeout to busy-wait loop in flushAppendOnlyFile - Add handleUncommittedKeyForClient for expiry and eviction paths - Handle COPY/MOVE destination DB in non-replicating blocking offset - Swap uncommitted key tracking in dbSwapDatabases/swapMainDbWithTempDb - Add explanatory comments for beforeSleep guard and afterCommand - Fix Pub\/Sub typo in server.h Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
Replace the FIFO offset tracker queue in uncommitted key cleanup with direct hashtable iteration using HASHTABLE_ITER_SAFE. On each fsync completion, iterate the per-DB uncommitted_keys hashtable and delete entries whose offset has been committed. This is simpler, uses less memory, and handles hot keys better: - Removes offsetTrackerEntry struct, offset_tracker_queue list, and enqueueOffsetTracker() function (~100 fewer lines) - No per-write allocation overhead (no queue push + sdsdup per key) - ~70% less memory per uncommitted key (no duplicate key strings) - Hot keys don't cause queue bloat (hashtable deduplicates in-place) - Drain cost is negligible (~14us for 200 keys) vs 2-6ms fsync Also removes unused uncommitted_keys_cursor and scan_in_progress fields from serverDb, and fixes _Atomic C++ compatibility in server.h. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
… keys on FLUSHALL in MULTI Two bugs fixed: 1. Client structs are allocated with zmalloc (not zeroed), so clientDurabilityInfo.blocked_responses contains garbage when durabilityClientInit() returns early because durability is disabled. When a test later enables appendfsync=always at runtime, isClientReplyBufferLimited() reads the garbage pointer on existing clients, causing a SIGSEGV in clientHasPendingReplies(). Fix: memset-zero clientDurabilityInfo in createClient(). 2. FLUSHALL inside MULTI/EXEC clears the pending_uncommitted_keys list but leaves stale entries in the per-DB uncommitted_keys hashtables with LLONG_MAX placeholder offsets. These entries never get cleaned up because processPendingUncommittedData() lost the references to update them. Subsequent reads on those keys hang forever waiting for an offset that will never be acked. Fix: clear uncommitted_keys hashtables when all_dbs_dirty is set. Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
📝 WalkthroughWalkthroughThis PR implements a comprehensive synchronous replication durability system for Valkey. It introduces pluggable durability providers (AOF fsync and replication ISR), deferred task execution, reply blocking until consensus acknowledgement, uncommitted key tracking across databases, and AOF fsync offloading to IO threads. The system integrates throughout the command execution pipeline and includes extensive test coverage. ChangesSynchronous Replication Durability
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Suggested reviewers
✨ Finishing Touches⚔️ Resolve merge conflicts
|
There was a problem hiding this comment.
Actionable comments posted: 18
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/server.h (1)
1168-1253:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
ClientFlagsbitfield now exceeds the 64-bitraw_flagunion overlay.The struct now has 65 one-bit fields, which spills beyond the
uint64_t raw_flagunion member. Code that resets flags viaraw_flag = 0(insrc/aof.c,src/networking.c, andsrc/module.c) will not cleardurable_blocked_client, leaving stale values that can cause initialization bugs when this flag is used.Restructure
ClientFlagsto fit within 64 bits, or movedurable_blocked_clientto a separate flag field outside the union.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/server.h` around lines 1168 - 1253, The ClientFlags bitfield now defines 65 one-bit members (notably durable_blocked_client) which overflows the 64-bit raw_flag union used elsewhere; fix by moving durable_blocked_client out of the ClientFlags union (or otherwise reducing fields to <=64 bits) so that raw_flag = 0 actually clears all flags. Concretely, remove durable_blocked_client from the ClientFlags struct and add a separate uint8_t/uint64_t flag field (e.g., extra_client_flags or a standalone durable_blocked_client boolean) on the client struct, update references to durable_blocked_client to use the new field, and ensure codepaths that reset flags (code using raw_flag = 0 in functions referenced in src/aof.c, src/networking.c, src/module.c) still only clear the original 64-bit raw_flag while the new flag is explicitly initialized/cleared where clients are created/freed.
🧹 Nitpick comments (4)
src/script.c (1)
205-205: ⚡ Quick winUse C-style comment syntax in C sources.
Line 205 uses
//style. Please switch it to/* ... */to match C-file conventions.Proposed fix
- // check if sync replication would want to stop the execution. + /* Check if sync replication should stop script execution. */As per coding guidelines, "Use C-style /* */ comments for single or multi-line comments in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/script.c` at line 205, Replace the C++-style single-line comment starting with "// check if sync replication would want to stop the execution." with a C-style comment using /* ... */ in src/script.c so the comment reads /* check if sync replication would want to stop the execution. */; locate the exact comment text to make the change (no code logic changes required).src/object.c (1)
1163-1171: ⚡ Quick winAdd a function-level doc comment for
getIntFromObject.Please document the contract (parse as integer object and enforce
intbounds), consistent with C function documentation rules.Proposed fix
+/* Parse object as long long and store it as int when value is within int range. */ int getIntFromObject(robj *o, int *target) { long long value; if (getLongLongFromObject(o, &value) != C_OK) return C_ERR; if (value < INT_MIN || value > INT_MAX) return C_ERR; *target = value; return C_OK; }As per coding guidelines, "Document why code exists, not just what it does; document all functions in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/object.c` around lines 1163 - 1171, Add a function-level C doc comment above getIntFromObject describing its contract: it parses a Redis object as an integer using getLongLongFromObject, enforces that the parsed value fits into C int bounds (INT_MIN..INT_MAX), stores the result into the out-parameter target on success, and returns C_OK on success or C_ERR on failure; document arguments (robj *o, int *target), return values (C_OK/C_ERR), the error conditions (non-integer object or out-of-range), and any relevant notes (no allocation, callers must provide non-NULL target). Ensure the comment follows the project's C comment style and is placed immediately above the getIntFromObject definition.src/aof.c (2)
55-70: Request@core-teamreview for the AOF durability path.This change introduces a new cross-thread durability/flush state machine in
src/aof.c; it would benefit from the architectural review required for this file class before merge.As per coding guidelines, "Request
@core-teamarchitectural review for changes to cluster*.c, replication.c, rdb.c, or aof.c".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/aof.c` around lines 55 - 70, The change adds a cross-thread AOF durability/flush state machine in src/aof.c (see enum AOF_IO_FLUSH_*, struct aofIOFlushJob, and functions processAofIOThreadFlushResult + tryOffloadAofAlwaysFlushToIOThreads) and must include an explicit architectural review request per project guidelines; update the diff by inserting a clear code comment or TODO at the top of the new section (near the enum or above the two static function declarations) that requests an `@core-team` architectural review for this change and briefly states the reason (cross-thread durability/flush state machine), and also add a short note in the PR description referencing this comment so reviewers see it.
1183-1293: ⚡ Quick winDocument the handoff/state-machine invariants for the IO-thread flush path.
This is the non-obvious part of the change: ownership of
server.aof_bufmoves to the worker, only one flush may be in flight, andDONE/ERRmust be consumed on the main thread before the next flush cycle. Please add function-level comments around these helpers so the lifetime and ordering rules are explicit.As per coding guidelines, "Document why code exists, not just what it does; document all functions in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/aof.c` around lines 1183 - 1293, Add function-level comments to document the IO-thread AOF flush invariants: in aofIOThreadFlushJobHandler, processAofIOThreadFlushResult, and tryOffloadAofAlwaysFlushToIOThreads explain that ownership of server.aof_buf is transferred to the IO worker when tryOffloadAofAlwaysFlushToIOThreads hands off a job, that only one flush may be in flight (enforced by server.aof_io_flush_state values AOF_IO_FLUSH_PENDING/DONE/ERR/IDLE), that DONE or ERR results must be consumed on the main thread by processAofIOThreadFlushResult before another flush is started, and clarify the ordering guarantees (which atomics control the producer/consumer handoff and the use of memory_order_acquire/release), plus when notifyDurabilityProgress() is used to unblock clients; place these explanatory comments immediately above each of the three functions named above.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/aof.c`:
- Around line 1319-1334: The code can miss a terminal IO-thread state transition
occurring after the first processAofIOThreadFlushResult() and before
aofIOFlushInProgress() checks, letting flushAppendOnlyFile(1) proceed on stale
state; fix by ensuring the terminal state is always consumed before continuing:
after calling processAofIOThreadFlushResult() initially, and again after the
busy-wait loop, loop-call processAofIOThreadFlushResult() (or re-check until
aofIOFlushInProgress() is false and no AOF_IO_FLUSH_PENDING remains and any
AOF_IO_FLUSH_DONE/AOF_IO_FLUSH_ERR has been handled) so stopAppendOnly() and
openNewIncrAofForAppend() only run after the IO thread’s final DONE/ERR has been
processed.
In `@src/config.c`:
- Line 3268: The config "sync-eligible" is declared MODIFIABLE_CONFIG via
createBoolConfig("sync-eligible", NULL, MODIFIABLE_CONFIG, server.sync_eligible,
...) but has no apply callback, so runtime CONFIG SET won't re-advertise
eligibility to the primary; either change the flag to IMMUTABLE_CONFIG to
prevent runtime changes, or add an apply hook function (passed instead of NULL)
that updates server.sync_eligible and triggers the replica handshake
refresh/reconnection (i.e., call the routine that forces re-run of the
SEND_HANDSHAKE logic or reconnects the replica) so the primary sees the new
value immediately; locate the config entry in createBoolConfig and implement the
apply handler to perform this refresh/reconnect.
In `@src/db.c`:
- Around line 1828-1833: The swap leaves tempDb[i]->uncommitted_keys pointing to
a heap-allocated hashtable created in durabilityInitDatabase (via
hashtableCreate), but discardTempDb() currently only calls kvstoreRelease on
keys/expires/keys_with_volatile_items and omits freeing uncommitted_keys; update
discardTempDb() to call hashtableRelease(tempDb[i]->uncommitted_keys) (or
equivalent cleanup) for each tempDb entry (before or after existing
kvstoreRelease calls) to free the swapped hashtable and prevent a leak.
In `@src/debug.c`:
- Around line 1088-1099: The code reads bufpos_val and data_len_val via
getLongLongFromObjectOrReply and casts them to size_t before storing into
target->io_last_written.bufpos/data_len, which allows negative inputs to wrap to
huge sizes; validate both values are non-negative and enforce bounds against the
target client's buffer capacity before assigning: after lookupClientByID
succeeds, check bufpos_val >= 0 and data_len_val >= 0, then ensure
(size_t)bufpos_val <= target buffer capacity and (size_t)data_len_val <= (target
buffer capacity - (size_t)bufpos_val) (so bufpos + data_len does not overflow
the buffer), and return addReplyError for invalid inputs; apply these checks in
the DEBUG set-io-last-written handling around the assignments to
target->io_last_written.
In `@src/durability_provider.c`:
- Around line 48-50: resetDurabilityProviders only clears
num_durability_providers but leaves static built-in provider state (paused /
pausedOffset) intact; update resetDurabilityProviders to also iterate the
built_in_durability_providers array and reset each provider's paused = false and
pausedOffset = 0 (or the struct's default) before clearing
num_durability_providers so re-registration cannot resurrect stale pause state;
reference the built_in_durability_providers array and the paused / pausedOffset
fields on the provider struct when making the change.
In `@src/durable_task.c`:
- Around line 187-193: The deferred FLUSH task currently reconstructs the async
flag from argv[0] as a boolean and then calls
trackingInvalidateKeysOnFlush(is_flush_all ? -1 : 0), which loses the original
async semantics; change the task payload (taskWaitingAck.argv) to store the
original async value produced by db.c (0 or 1) alongside the flush type, update
executeFlushInvalidationTask to read that async value from task->argv (instead
of synthesizing -1/0) and pass the preserved async into
trackingInvalidateKeysOnFlush, and ensure any code that builds the task (the
FLUSH scheduling path in db.c) writes the async flag into the payload.
In `@src/module.c`:
- Around line 6810-6818: The early return path after calling preScriptCmd(c) can
jump to the cleanup label and skip restoring server.replication_allowed; capture
the current value into a local (e.g., int old_replication_allowed =
server.replication_allowed) before mutating it and ensure you restore it on all
exit paths — either by assigning server.replication_allowed =
old_replication_allowed immediately before the goto in the pre_script_err !=
NULL branch, or by making the cleanup block always restore
server.replication_allowed (so preScriptCmd, pre_script_err,
server.replication_allowed and the cleanup label are updated accordingly).
In `@src/networking.c`:
- Around line 1719-1729: The code unconditionally dereferences
listFirst(c->clientDurabilityInfo.blocked_responses) into blockedResponse *n
which can be NULL; update the isClientReplyBufferLimited() branch to first check
that listFirst(c->clientDurabilityInfo.blocked_responses) is non-NULL before
using it, and if it is NULL skip this special blocked-response comparison and
fall back to the existing normal pending-reply check (i.e., behave as if there
is no blocked_response); ensure you reference the same symbols
(c->clientDurabilityInfo.blocked_responses, listFirst, blockedResponse *n,
n->disallowed_reply_block, n->disallowed_byte_offset, c->reply,
c->io_last_written.bufpos) so the null-guard is colocated with the current
logic.
In `@src/notify.c`:
- Around line 121-140: The current deferral branch queues all notifications (via
durabilityRegisterDeferredTask and setting NOTIFY_IN_DURABLE_TASK) when
isPrimaryDurabilityEnabled() is true, which incorrectly delays read-side
NOTIFY_KEY_MISS events emitted by lookupKey(); modify the logic to exclude
NOTIFY_KEY_MISS from the durable deferral path. Concretely, update the
computation/condition around shouldSendDelayedNotificationToClients (or the
branch that sets type |= NOTIFY_IN_DURABLE_TASK and calls
durabilityRegisterDeferredTask with DURABLE_KEYSPACE_NOTIFY_TASK) to require
that (type & NOTIFY_KEY_MISS) is false (e.g. add && !(type & NOTIFY_KEY_MISS))
so miss notifications bypass durability deferral and are delivered immediately.
In `@src/reply_blocking.c`:
- Around line 448-454: The code reads argv[2] into dest_dbid via
getIntFromObject and then indexes server.db[dest_dbid] without validating the
parsed DB id; add a bounds check after getIntFromObject to ensure dest_dbid is
>= 0 and < server.dbnum, call getKeysFreeResult(&result) and return -1 on
invalid DB, and only call handleUncommittedKeyForClient(c, c->argv[keys[0].pos],
server.db[dest_dbid]) when the DB id is valid; apply the same validation at the
other occurrence that handles MOVE (the block around
handleUncommittedKeyForClient at the second location).
- Around line 291-296: The debug log in blockClientOnReplOffset unconditionally
dereferences c->cmd (c->cmd->declared_name and c->cmd->flags) which can be NULL
for MONITOR clients; update the logging to guard against NULL by checking c->cmd
(and do the same in the analogous block in blockClientAndMonitorsOnReplOffset)
and use safe fallbacks (e.g. a "<unknown>" name and 0 for flags) or a
conditional expression when building the serverLog call so no dereference occurs
when c->cmd is NULL.
- Around line 565-574: durabilitySetClientCmdFlags is only OR-ing last-command
bits so once a client ever executes a write the DURABILITY_CLIENT_LAST_CMD_WRITE
bit is never cleared; update durabilitySetClientCmdFlags to first clear both
DURABILITY_CLIENT_LAST_CMD_WRITE and DURABILITY_CLIENT_LAST_CMD_READONLY from
c->clientDurabilityInfo.durability_flags, then set exactly one of those bits
based on c->cmd->flags (checking CMD_WRITE and CMD_READONLY) so each command
re-classifies the client's last-command state correctly (this prevents
initCmdMetrics from seeing stale write bits).
- Around line 803-818: durabilityReset currently only drains per-client
durability state (via durabilityClientInit) when durability is enabled, and when
disabling durability it skips clearing clients_waiting_ack for replicas, leaving
blocked waiters stuck; fix by iterating server.clients in the "else" branch
(durability disabled) and for each client call durabilityClientInit (or the
function that clears the client's clients_waiting_ack) so all clients on
replicas also have their durability waiter lists drained; keep the existing
iAmPrimary()/durabilityResetPrimaryState and clearAllUncommittedKeys logic but
add the client-iteration cleanup to the else path.
In `@src/server.c`:
- Around line 2059-2063: The current aof_needs_io check only inspects
server.aof_buf length and misses cases where flushAppendOnlyFile() has already
dispatched work to an IO thread; update the logic used by
adjustIOThreadsByEventLoad to also consider an in-flight-AOF flag (e.g.,
server.aof_io_in_progress or server.aof_flush_in_progress) that is set when
flushAppendOnlyFile() hands work to an IO thread and cleared when that work
completes; if such a flag does not exist, add it and ensure
flushAppendOnlyFile() sets/clears it around the handoff so aof_needs_io becomes
(server.aof_state != AOF_OFF && server.aof_fsync == AOF_FSYNC_ALWAYS &&
(sdslen(server.aof_buf) > 0 || server.aof_io_in_progress)) before calling
adjustIOThreadsByEventLoad.
- Around line 1831-1838: The blocked-loop branch that skips
handleClientsWithPendingWrites() also needs to run the AOF durability
bookkeeping so replies unblocked by the IO thread get their fsynced_reploff
updated and notifyDurabilityProgress() called; update the branch so when aof IO
is offloaded (aofIOFlushInProgress() true) you still invoke the same
durability-path steps used in the normal path—call
processAofIOThreadFlushResult() (or the existing helper that updates
fsynced_reploff) and then notifyDurabilityProgress() after
processEventsWhileBlocked() completes, ensuring consistency with
handleClientsWithPendingWrites() behavior and thread-safety in the threaded I/O
path.
In `@tests/durability/reply_blocking.tcl`:
- Around line 651-653: Replace the fixed wall-clock sleeps ("after 100", "after
200") with condition-based waits using the test helper wait_for_condition:
identify the points where the test currently does "after 100"/"after 200" (e.g.,
before asserting reply arrival, waiting-ack metric, or expected push message)
and instead poll for the observable state (reply received, server waiting-ack
counter updated, or push message seen) with a reasonable timeout; update the
test cases referencing these sleeps (the occurrences noted around the reply
arrival and push/ack assertions) to call wait_for_condition with a predicate
that checks the actual state before proceeding so the assertions become
deterministic.
- Around line 572-590: The FUNCTION LOAD/DELETE tests rely on a prior test's
library ("durtest"), breaking isolation; modify both tests so each is
self-contained by creating any required library before deleting or validating it
and cleaning it up afterwards: in the "($provider_mode) FUNCTION LOAD blocks
reply until provider acks" test ensure it registers a unique function/library
name (or explicitly create "durtest" at test start) instead of assuming it
exists, and in the corresponding FUNCTION DELETE test (around the 592-610
region) make it create the library/function it will delete (use unique name or
same scoped creation), then perform DELETE and assertions, and finally close the
client ($rd close) to fully teardown; update usages of server.register_function,
FUNCTION LOAD and FUNCTION DELETE calls and any assertions to operate on the
locally-created resource so tests no longer depend on execution order.
- Around line 932-935: Replace the broad psubscribe $rd1 * with a subscription
to the specific keyspace/keyevent pattern that the test exercises (e.g.
subscribe to the set event for the test key) so the assertion only observes the
intended message; update the psubscribe call that uses rd1 (created by
valkey_deferring_client) to a precise pattern such as the keyevent or keyspace
pattern for the "set" event (e.g. "__keyevent@0__:set" or the equivalent
keyspace pattern for "foo") so the subsequent [$rd1 read] assertion only matches
the produced set notification from r set foo bar.
---
Outside diff comments:
In `@src/server.h`:
- Around line 1168-1253: The ClientFlags bitfield now defines 65 one-bit members
(notably durable_blocked_client) which overflows the 64-bit raw_flag union used
elsewhere; fix by moving durable_blocked_client out of the ClientFlags union (or
otherwise reducing fields to <=64 bits) so that raw_flag = 0 actually clears all
flags. Concretely, remove durable_blocked_client from the ClientFlags struct and
add a separate uint8_t/uint64_t flag field (e.g., extra_client_flags or a
standalone durable_blocked_client boolean) on the client struct, update
references to durable_blocked_client to use the new field, and ensure codepaths
that reset flags (code using raw_flag = 0 in functions referenced in src/aof.c,
src/networking.c, src/module.c) still only clear the original 64-bit raw_flag
while the new flag is explicitly initialized/cleared where clients are
created/freed.
---
Nitpick comments:
In `@src/aof.c`:
- Around line 55-70: The change adds a cross-thread AOF durability/flush state
machine in src/aof.c (see enum AOF_IO_FLUSH_*, struct aofIOFlushJob, and
functions processAofIOThreadFlushResult + tryOffloadAofAlwaysFlushToIOThreads)
and must include an explicit architectural review request per project
guidelines; update the diff by inserting a clear code comment or TODO at the top
of the new section (near the enum or above the two static function declarations)
that requests an `@core-team` architectural review for this change and briefly
states the reason (cross-thread durability/flush state machine), and also add a
short note in the PR description referencing this comment so reviewers see it.
- Around line 1183-1293: Add function-level comments to document the IO-thread
AOF flush invariants: in aofIOThreadFlushJobHandler,
processAofIOThreadFlushResult, and tryOffloadAofAlwaysFlushToIOThreads explain
that ownership of server.aof_buf is transferred to the IO worker when
tryOffloadAofAlwaysFlushToIOThreads hands off a job, that only one flush may be
in flight (enforced by server.aof_io_flush_state values
AOF_IO_FLUSH_PENDING/DONE/ERR/IDLE), that DONE or ERR results must be consumed
on the main thread by processAofIOThreadFlushResult before another flush is
started, and clarify the ordering guarantees (which atomics control the
producer/consumer handoff and the use of memory_order_acquire/release), plus
when notifyDurabilityProgress() is used to unblock clients; place these
explanatory comments immediately above each of the three functions named above.
In `@src/object.c`:
- Around line 1163-1171: Add a function-level C doc comment above
getIntFromObject describing its contract: it parses a Redis object as an integer
using getLongLongFromObject, enforces that the parsed value fits into C int
bounds (INT_MIN..INT_MAX), stores the result into the out-parameter target on
success, and returns C_OK on success or C_ERR on failure; document arguments
(robj *o, int *target), return values (C_OK/C_ERR), the error conditions
(non-integer object or out-of-range), and any relevant notes (no allocation,
callers must provide non-NULL target). Ensure the comment follows the project's
C comment style and is placed immediately above the getIntFromObject definition.
In `@src/script.c`:
- Line 205: Replace the C++-style single-line comment starting with "// check if
sync replication would want to stop the execution." with a C-style comment using
/* ... */ in src/script.c so the comment reads /* check if sync replication
would want to stop the execution. */; locate the exact comment text to make the
change (no code logic changes required).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: c988b5fc-c325-4e27-b807-e0d41e8ff1c0
📒 Files selected for processing (31)
.gitignorecmake/Modules/SourceFiles.cmakesrc/Makefilesrc/aof.csrc/config.csrc/db.csrc/debug.csrc/durability_provider.csrc/durability_provider.hsrc/durable_task.csrc/durable_task.hsrc/evict.csrc/io_threads.csrc/io_threads.hsrc/module.csrc/networking.csrc/notify.csrc/object.csrc/replication.csrc/reply_blocking.csrc/reply_blocking.hsrc/script.csrc/server.csrc/server.hsrc/uncommitted_keys.csrc/uncommitted_keys.hsrc/unit/CMakeLists.txtsrc/unit/test_files.hsrc/unit/test_reply_blocking.cpptests/durability/reply_blocking.tcltests/durability/sync-replication.tcl
👮 Files not reviewed due to content moderation or server errors (3)
- src/replication.c
- src/unit/test_reply_blocking.cpp
- src/uncommitted_keys.h
There was a problem hiding this comment.
A new
min-sync-replicasconfig
Is it possible to make that configurable per user connection? This way end user could balance durability and performance/latency based on the importance of some data. I was thinking about writeConcern in mongo, it can be set at operation or database level.
Yeah I see databases with |
How would this work during a rolling upgrade? Let's say each shard has 1 primary and 1 replica.
Before removing the old replica, we must ensure that the new replica has got all the writes. If we only require one replica, it can be that the old replica had all the writes when it shut down, but the new relica doesn't. Does the operator need to change the config to 2 at some step during the upgrade and later change it back to 1? Or change a config on the replicas? What and when exactly? |
Sync Replication — Commit Protocol
This intention of this PR is to seek for early feedback from the community regarding sync-replication. This is based on top of open PR for AOF based durability - #3381
This is the first PR in a multi-phase effort to add durability guarantees to Valkey through synchronous replication. It introduces the commit protocol based on the PacificA framework, where a write is only acknowledged to the client once it has been replicated to a configured number of replicas. We are using the term sync-replica to refer to the replica which contributes to durability and is conceptually same as "replica in the configuration" as per PacificA terminology.
This PR focuses on the steady-state commit path. Failover, reconciliation, snapshots, lease mechanism, etc are intentionally out of scope and will be addressed in follow-up PRs.
What this PR covers
Note - See customer experience section below to understand how these new configs would be used in practise.
Primary side
min-sync-replicasconfig. When > 0, sync replication is enabled.min-sync-replicas, writes are rejected with CLUSTERDOWN.min-sync-replicafaults.committed offset = minimum ack offset across the sync replica set. Durability can be configured as AND of AOF + sync replication provider.Replica side
sync-eligibleconfig that operators set on replicas they want to participate as sync replicas. This allows operator to have 1P + X sync replica + Y async read replica.replconf capa syncduring handshake so that primary knows which replica can be considered as sync replica when they catch up.repl_ack_off >= committed_offset && replication lag within thresholdREPLCONF COMMIT <offset>from primary it advances their local committed offset and unblock clients if needed.Customer Experience
A developer sets up a cluster with a config as such
sync-eligible: yes. Here N2 and N3 strives to be sync replica. N4 is only read replica which does not contribute to durability.Lets consider cluster is bootstrapped and we are in a state where N1 is primary and N2, N3 have been included in sync replica set and N4 is operating as async read replica.
min-sync-replica(1)min-sync-replica(1)Note - commit offset is only progressed based on current list of sync replicas.
Why no separate sequence number in replication stream
PacificA describes the protocol in terms of monotonic serial numbers (SN) per request. Valkey's replication stream already provides an equivalent:
primary_repl_offsetis a monotonic byte offset that uniquely identifies every position in the stream. Ordering and continuity come from the stream itself — there are no gaps, and replicas apply updates in offset order.This PR tracks a
committed_offsetin the durability subsystem, which corresponds exactly to PacificA's committed point on the prepared list, just expressed in bytes rather than a discrete counter. Together, primary_repl_offset (the sequence) and committed_offset (the commit point) cover every role PacificA assigns to the serial number: total ordering, gap detection, commit tracking. No new counter is introduced. Reconciliation will be covered later but the idea is to use PSYNC mechanism with epoch/config version stored in the raft-enabled cluster bus.Note
What's coming in follow-up PRs
How to review PR
ISR management: replication.c - search for REPLICA_CAPA_SYNC (promotion) .
Commit offset calculation: durability_provider.c — replicationProviderGetAckedOffset() computes min ack across ISR; getDurabilityConsensusOffset() combines all providers.
Write rejection: server.c — processCommand() calls checkSyncReplicasStatus(), rejects with CLUSTERDOWN if ISR < threshold.
REPLCONF COMMIT sent by primary: replication.c — search for commit_argv in replicationCron().
REPLCONF COMMIT processed by replica: replication.c — search for "commit" in replconfCommand().
Uncommitted key tracking (WBL):
New config - config.c
Tests: sync-replication.tcl — good starting point to understand expected behavior.