Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
02deba7
aof: offload appendfsync=always flush+fsync to IO threads
jjuleslasarte Mar 18, 2026
75cc8de
durability: add pluggable durability provider framework
jjuleslasarte Mar 18, 2026
05c1437
durability: add deferred task system for post-ack execution
jjuleslasarte Mar 18, 2026
f15f181
durability: add uncommitted key tracking per database
jjuleslasarte Mar 18, 2026
72a31e2
durability: add reply blocking and wire into server subsystems
jjuleslasarte Mar 18, 2026
3253c5c
build: add durability source files to Makefile and CMake
jjuleslasarte Mar 18, 2026
a76070b
tests: add unit tests for durability reply blocking
jjuleslasarte Mar 18, 2026
85aa8e7
tests: add integration tests for durability reply blocking
jjuleslasarte Mar 18, 2026
b125b02
Merge branch 'unstable' into aof-durability
jjuleslasarte Mar 19, 2026
cd59c4f
Merge branch 'unstable' into aof-durability
jjuleslasarte Mar 20, 2026
b285ba5
Remove separate durability config; derive from appendfsync always
jjuleslasarte Apr 1, 2026
4daeb60
Refactor uncommitted keys: mark dirty immediately, use offset tracker…
jjuleslasarte Apr 1, 2026
b29770b
Merge upstream/unstable
jjuleslasarte Apr 1, 2026
7a78154
isDurabilityEnabled: delegate to anyDurabilityProviderEnabled
jjuleslasarte Apr 1, 2026
9007510
Fix dangling event string in keyspace notify task
jjuleslasarte Apr 1, 2026
45ded39
fix: fix 32-bit build for durability source files
jjuleslasarte Apr 2, 2026
a63f9ed
fix: use bufpos instead of data_len for copy-avoidance compatible com…
jjuleslasarte Apr 2, 2026
0d49022
Fix CR feedback: stale comment, 32-bit build, and replica rejection l…
jjuleslasarte Apr 7, 2026
5c4b9c5
Address PR #3381 review feedback
jjuleslasarte Apr 27, 2026
330d581
refactor: replace offset tracker queue with hashtable iteration
jjuleslasarte Apr 30, 2026
110fc8c
fix: zero-initialize clientDurabilityInfo and clear stale uncommitted…
jjuleslasarte Apr 30, 2026
f989319
Sync-replication: Part 1
May 3, 2026
fdc7429
Sync replication: WBL
May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ cmake-build-debug/
cmake-build-release/
__pycache__
src/unit/.flags
.DS_Store
4 changes: 4 additions & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/dict.c
${CMAKE_SOURCE_DIR}/src/reply_blocking.c
${CMAKE_SOURCE_DIR}/src/durable_task.c
${CMAKE_SOURCE_DIR}/src/durability_provider.c
${CMAKE_SOURCE_DIR}/src/uncommitted_keys.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/kvstore.c
${CMAKE_SOURCE_DIR}/src/sds.c
Expand Down
4 changes: 4 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ ENGINE_SERVER_OBJ = \
rdma.o \
release.o \
replication.o \
reply_blocking.o \
durable_task.o \
durability_provider.o \
uncommitted_keys.o \
resp_parser.o \
rio.o \
script.o \
Expand Down
158 changes: 158 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "server.h"
#include "bio.h"
#include "io_threads.h"
#include "rio.h"
#include "functions.h"
#include "module.h"
Expand All @@ -51,6 +52,23 @@ aofManifest *aofLoadManifestFromFile(sds am_filepath);
void aofManifestFreeAndUpdate(aofManifest *am);
void aof_background_fsync_and_close(int fd);

enum {
AOF_IO_FLUSH_IDLE = 0,
AOF_IO_FLUSH_PENDING,
AOF_IO_FLUSH_DONE,
AOF_IO_FLUSH_ERR,
};

typedef struct aofIOFlushJob {
int fd;
sds buf;
size_t len;
long long reploff;
} aofIOFlushJob;

static void processAofIOThreadFlushResult(void);
static int tryOffloadAofAlwaysFlushToIOThreads(void);

/* ----------------------------------------------------------------------------
* AOF Manifest file implementation.
*
Expand Down Expand Up @@ -952,6 +970,9 @@ void stopAppendOnly(void) {
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.fsynced_reploff = -1;
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed);
killAppendOnlyChild();
sdsfree(server.aof_buf);
Expand Down Expand Up @@ -1002,6 +1023,9 @@ int startAppendOnly(void) {
serverLog(LL_WARNING, "AOF reopen, just ignore the last error.");
server.aof_last_write_status = C_OK;
}
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
return C_OK;
}

Expand Down Expand Up @@ -1156,6 +1180,118 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
return totwritten;
}

static void aofIOThreadFlushJobHandler(void *data) {
aofIOFlushJob *job = data;
int err = 0;
ssize_t nwritten = aofWrite(job->fd, job->buf, job->len);
if (nwritten != (ssize_t)job->len) {
err = (nwritten == -1) ? errno : ENOSPC;
goto done;
}

if (valkey_fsync(job->fd) == -1) {
err = errno;
goto done;
}

atomic_store_explicit(&server.fsynced_reploff_pending, job->reploff, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, job->len, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_DONE, memory_order_release);
sdsfree(job->buf);
zfree(job);
return;

done:
atomic_store_explicit(&server.aof_io_flush_errno, err, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_ERR, memory_order_release);
sdsfree(job->buf);
zfree(job);
}

int aofIOFlushInProgress(void) {
return atomic_load_explicit(&server.aof_io_flush_state, memory_order_acquire) == AOF_IO_FLUSH_PENDING;
}

static void processAofIOThreadFlushResult(void) {
int state = atomic_load_explicit(&server.aof_io_flush_state, memory_order_acquire);
if (state == AOF_IO_FLUSH_IDLE || state == AOF_IO_FLUSH_PENDING) return;

if (state == AOF_IO_FLUSH_DONE) {
off_t nwritten = atomic_load_explicit(&server.aof_io_flush_size, memory_order_relaxed);
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_NOTICE, "AOF write error looks solved. The server can write again.");
server.aof_last_write_status = C_OK;
}
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);

/* Notify sync replication that AOF fsync completed so blocked clients can be unblocked */
notifyDurabilityProgress();
return;
}

int err = atomic_load_explicit(&server.aof_io_flush_errno, memory_order_relaxed);
server.aof_last_write_errno = err;
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);

/* IO thread flush is only used with appendfsync=always, so an error here
* is always fatal. We cannot guarantee durability. */
serverLog(LL_WARNING,
"Can't persist AOF from IO thread for "
"AOF fsync policy 'always': %s. Exiting...",
strerror(err));
exit(1);
}

static int tryOffloadAofAlwaysFlushToIOThreads(void) {
if (server.aof_fsync != AOF_FSYNC_ALWAYS || sdslen(server.aof_buf) == 0 || aofIOFlushInProgress()) {
return C_ERR;
}

/* Ensure the previous IO thread result has been fully processed.
* aofIOFlushInProgress() only checks for PENDING; the state could also
* be DONE or ERR if processAofIOThreadFlushResult() hasn't run yet. */
if (atomic_load_explicit(&server.aof_io_flush_state, memory_order_acquire) != AOF_IO_FLUSH_IDLE) {
return C_ERR;
}

/* If IO threads are configured but not active, we can't offload.
* Note: Thread activation based on AOF workload is handled by
* adjustIOThreadsByEventLoad() via the has_background_work parameter. */
if (server.io_threads_num <= 1 || server.active_io_threads_num <= 1) {
return C_ERR;
}

/* NOTE: With sync replication enabled, we still want to offload fsync to
* IO threads to avoid blocking the main thread. The notifyDurabilityProgress()
* callback will be invoked in beforeSleep() when we check for completed IO thread
* jobs, which will then unblock waiting clients. This adds at most one
* event loop iteration of latency but keeps the main thread responsive. */

aofIOFlushJob *job = zmalloc(sizeof(*job));
job->fd = server.aof_fd;
job->buf = server.aof_buf;
job->len = sdslen(job->buf);
job->reploff = server.primary_repl_offset;

atomic_store_explicit(&server.aof_io_flush_errno, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_size, 0, memory_order_relaxed);
atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_PENDING, memory_order_release);
if (trySendJobToIOThreads(aofIOThreadFlushJobHandler, job) == C_OK) {
/* Hand off the buffer to the IO thread; allocate a fresh one for new writes. */
server.aof_buf = sdsempty();
server.aof_flush_postponed_start = 0;
return C_OK;
}

atomic_store_explicit(&server.aof_io_flush_state, AOF_IO_FLUSH_IDLE, memory_order_release);
zfree(job);
return C_ERR;
}

/* Write the append only file buffer on disk.
*
* Since we are required to write the AOF before replying to the client,
Expand All @@ -1180,6 +1316,23 @@ void flushAppendOnlyFile(int force) {
int sync_in_progress = 0;
mstime_t latency;

processAofIOThreadFlushResult();
if (aofIOFlushInProgress()) {
if (!force) return;
/* Busy-wait for the IO thread to finish. Timeout after 30 seconds
* to prevent hanging indefinitely. */
monotime wait_start = getMonotonicUs();
while (aofIOFlushInProgress()) {
usleep(100);
processAofIOThreadFlushResult();
if (getMonotonicUs() - wait_start > 30 * 1000000ULL) {
serverLog(LL_WARNING,
"Timed out waiting for AOF IO thread flush to complete. Exiting...");
exit(1);
}
}
}
Comment thread
sushilpaneru1 marked this conversation as resolved.

if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
Expand Down Expand Up @@ -1234,6 +1387,11 @@ void flushAppendOnlyFile(int force) {
"without waiting for fsync to complete, this may slow down the server.");
}
}

if (server.aof_fsync == AOF_FSYNC_ALWAYS && !force && tryOffloadAofAlwaysFlushToIOThreads() == C_OK) {
return;
}

/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
Expand Down
8 changes: 8 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,9 @@ static int updateAppendOnly(const char **err) {
return 0;
}
}
/* Durability is implied by appendfsync always + AOF on, so toggling
* appendonly may enable or disable it. */
durabilityReset();
return 1;
}

Expand Down Expand Up @@ -2694,6 +2697,9 @@ int updateAppendFsync(const char **err) {
* worker thread. */
bioDrainWorker(BIO_AOF_FSYNC);
}
/* Durability is implied by appendfsync always + AOF on, so toggling
* appendfsync may enable or disable it. */
durabilityReset();
return 1;
}

Expand Down Expand Up @@ -3259,6 +3265,7 @@ standardConfig static_configs[] = {
createBoolConfig("repl-mptcp", NULL, IMMUTABLE_CONFIG, server.repl_mptcp, 0, isValidMptcp, NULL),
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
createBoolConfig("dual-channel-replication-enabled", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.dual_channel_replication, 0, NULL, NULL),
createBoolConfig("sync-eligible", NULL, MODIFIABLE_CONFIG, server.sync_eligible, 0, NULL, NULL),
Comment thread
sushilpaneru1 marked this conversation as resolved.
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
Expand Down Expand Up @@ -3397,6 +3404,7 @@ standardConfig static_configs[] = {
createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ),
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_to_write, 0, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_replicas_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodReplicas),
createIntConfig("min-sync-replicas", NULL, IMMUTABLE_CONFIG, 0, 6, server.min_sync_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
Expand Down
23 changes: 23 additions & 0 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ long long dbTotalServerKeyCount(void) {
* a context of a client. */
void signalModifiedKey(client *c, serverDb *db, robj *key) {
touchWatchedKey(db, key);
if (durabilitySignalModifiedKey(c, db, key)) {
return;
}
trackingInvalidateKey(c, key, 1);
}

Expand All @@ -770,6 +773,10 @@ void signalFlushedDb(int dbid, int async) {
touchAllWatchedKeysInDb(server.db[j], NULL);
}

if (durabilitySignalFlushedDb(dbid)) {
return;
}

trackingInvalidateKeysOnFlush(async);

/* Changes in this method may take place in swapMainDbWithTempDb as well,
Expand Down Expand Up @@ -1766,6 +1773,13 @@ int dbSwapDatabases(int id1, int id2) {
db2->keys_with_volatile_items = aux.keys_with_volatile_items;
copyDbExpiry(db2, &aux);

/* Swap uncommitted key tracking so it stays consistent with the key data. */
db1->uncommitted_keys = db2->uncommitted_keys;
db1->dirty_repl_offset = db2->dirty_repl_offset;

db2->uncommitted_keys = aux.uncommitted_keys;
db2->dirty_repl_offset = aux.dirty_repl_offset;

/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
Expand Down Expand Up @@ -1811,6 +1825,13 @@ void swapMainDbWithTempDb(serverDb **tempDb) {
newdb->keys_with_volatile_items = aux.keys_with_volatile_items;
copyDbExpiry(newdb, &aux);

/* Swap uncommitted key tracking so it stays consistent with the key data. */
activedb->uncommitted_keys = newdb->uncommitted_keys;
activedb->dirty_repl_offset = newdb->dirty_repl_offset;

newdb->uncommitted_keys = aux.uncommitted_keys;
newdb->dirty_repl_offset = aux.dirty_repl_offset;
Comment thread
sushilpaneru1 marked this conversation as resolved.

/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
Expand Down Expand Up @@ -1946,6 +1967,7 @@ void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int d
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db, keyobj, server.lazyfree_lazy_expire, dict_index);
if (isPrimaryDurabilityEnabled()) handleUncommittedKeyForClient(NULL, keyobj, db);
server.stat_expiredkeys++;
}

Expand Down Expand Up @@ -2077,6 +2099,7 @@ size_t dbReclaimExpiredFields(robj *o, serverDb *db, mstime_t now, unsigned long
if (!hashTypeHasVolatileFields(o)) dbUntrackKeyWithVolatileItems(db, o);
}
signalModifiedKey(NULL, db, keyobj);
if (isPrimaryDurabilityEnabled()) handleUncommittedKeyForClient(NULL, keyobj, db);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(keyobj);
Expand Down
32 changes: 32 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "io_threads.h"
#include "sds.h"
#include "module.h"
#include "durability_provider.h"

#include <arpa/inet.h>
#include <signal.h>
Expand Down Expand Up @@ -1066,6 +1067,37 @@ void debugCommand(client *c) {
} else if (!strcasecmp(objectGetVal(c->argv[1]), "client-enforce-reply-list") && c->argc == 3) {
server.debug_client_enforce_reply_list = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "durability-provider-pause") && c->argc == 3) {
if (pauseDurabilityProvider(objectGetVal(c->argv[2]))) {
addReply(c, shared.ok);
} else {
addReplyError(c, "No such durability provider");
}
} else if (!strcasecmp(objectGetVal(c->argv[1]), "durability-provider-resume") && c->argc == 3) {
if (resumeDurabilityProvider(objectGetVal(c->argv[2]))) {
addReply(c, shared.ok);
} else {
addReplyError(c, "No such durability provider");
}
} else if (!strcasecmp(objectGetVal(c->argv[1]), "set-io-last-written") && c->argc == 5) {
/* DEBUG set-io-last-written <client-id> <bufpos> <data_len>
* Simulate a partial write state on a target client for testing.
* Sets io_last_written.buf to target->buf, bufpos and data_len to the given values.
* This allows injecting the post-partial-write state that triggers the
* data_len vs bufpos divergence with copy avoidance. */
long long client_id, bufpos_val, data_len_val;
if (getLongLongFromObjectOrReply(c, c->argv[2], &client_id, NULL) != C_OK) return;
if (getLongLongFromObjectOrReply(c, c->argv[3], &bufpos_val, NULL) != C_OK) return;
if (getLongLongFromObjectOrReply(c, c->argv[4], &data_len_val, NULL) != C_OK) return;
client *target = lookupClientByID((uint64_t)client_id);
if (target == NULL) {
addReplyError(c, "No such client");
return;
}
target->io_last_written.buf = target->buf;
target->io_last_written.bufpos = (size_t)bufpos_val;
target->io_last_written.data_len = (size_t)data_len_val;
Comment thread
sushilpaneru1 marked this conversation as resolved.
addReply(c, shared.ok);
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
Loading
Loading