Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -8980,6 +8980,7 @@ void moduleHandleBlockedClients(void) {
* the key was signaled as ready. */
long long prev_error_replies = server.stat_total_error_replies;
uint64_t reply_us = 0;
int reply_ret = VALKEYMODULE_OK;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
ValkeyModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, VALKEYMODULE_CTX_BLOCKED_REPLY);
Expand All @@ -8989,10 +8990,26 @@ void moduleHandleBlockedClients(void) {
ctx.blocked_client = bc;
monotime replyTimer;
elapsedStart(&replyTimer);
bc->reply_callback(&ctx, (void **)c->argv, c->argc);
reply_ret = bc->reply_callback(&ctx, (void **)c->argv, c->argc);
reply_us = elapsedUs(replyTimer);
moduleFreeContext(&ctx);
}

/* If reply callback returned VALKEYMODULE_REPLY_AGAIN, keep the client
* blocked. The module must not have written any reply before returning
* this value. The module will call UnblockClient again later.
* The module must handle timeout/disconnect by calling UnblockClient
* from those callbacks (standard blocked client contract). */
if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
Comment on lines +9004 to +9007
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Avoid recycling thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN.

VM_GetThreadSafeContext() stores bc->thread_safe_ctx_client into ctx->client. Replacing that client here means any outstanding thread-safe contexts created before the first unblock now hold a dangling client pointer. If a worker thread keeps using the same context after returning VALKEYMODULE_REPLY_AGAIN, this turns into a use-after-free / cross-request state corruption bug.

As per coding guidelines, "Verify thread safety in threaded I/O paths in C code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/module.c` around lines 9002 - 9005, The code releases and re-allocates
bc->thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN which invalidates
ctx->client returned by VM_GetThreadSafeContext and can cause use-after-free;
fix by not recycling or replacing bc->thread_safe_ctx_client in this path —
remove or skip the moduleReleaseTempClient/moduleAllocTempClient calls for
bc->thread_safe_ctx_client here and only touch bc->reply_client (or allocate a
new thread-safe client per VM_GetThreadSafeContext and ensure the context
owns/releases it) so outstanding thread-safe contexts keep a stable client
pointer.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Restore the reply client's RESP mode after reblocking.

The initial block path copies bc->client->resp into bc->reply_client, but this branch allocates a fresh pooled client without doing the same. Any buffered reply emitted before the next unblock can then be serialized in the wrong RESP version.

Proposed fix
         if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
             moduleReleaseTempClient(bc->reply_client);
             moduleReleaseTempClient(bc->thread_safe_ctx_client);
             bc->reply_client = moduleAllocTempClient();
             bc->thread_safe_ctx_client = moduleAllocTempClient();
+            if (bc->client) bc->reply_client->resp = bc->client->resp;
             bc->unblocked = 0;
             pthread_mutex_lock(&moduleUnblockedClientsMutex);
             continue;
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
if (bc->client) bc->reply_client->resp = bc->client->resp;
bc->unblocked = 0;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
continue;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/module.c` around lines 9002 - 9005, When replacing the temporary clients
in the reblock path, copy the original client's RESP mode into the newly
allocated reply client so buffered replies use the correct RESP version: after
calling moduleReleaseTempClient and moduleAllocTempClient for bc->reply_client
(and similarly for bc->thread_safe_ctx_client if needed), set
bc->reply_client->resp = bc->client->resp (or use the appropriate accessor on
the new temp client) to restore the RESP mode that was set earlier in the
initial block path.

if (c) bc->reply_client->resp = c->resp;
bc->unblocked = 0;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
continue;
}
/* Hold onto the blocked client if module auth is in progress. The reply callback is invoked
* when the client is reprocessed. */
if (c && clientHasModuleAuthInProgress(c)) {
Expand Down
3 changes: 3 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ typedef long long ustime_t;
#define VALKEYMODULE_OK 0
#define VALKEYMODULE_ERR 1

/* Return from reply callback to keep client blocked (re-block). */
#define VALKEYMODULE_REPLY_AGAIN 2

/* Module Based Authentication status return values. */
#define VALKEYMODULE_AUTH_HANDLED 0
#define VALKEYMODULE_AUTH_NOT_HANDLED 1
Expand Down
130 changes: 130 additions & 0 deletions tests/modules/blockedclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,124 @@ int unblock_by_timer(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
return VALKEYMODULE_OK;
}

/* --- REPLY_AGAIN (re-block) tests --- */

static int reblock_reply_count = 0;
static volatile int reblock_disconnect_called = 0;

/* Reply callback that re-blocks twice, then replies on the third call. */
int reblock_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
reblock_reply_count++;
if (reblock_reply_count < 3) {
/* Re-block: don't write any reply */
return VALKEYMODULE_REPLY_AGAIN;
}
/* Third time: actually reply */
ValkeyModule_ReplyWithLongLong(ctx, reblock_reply_count);
return VALKEYMODULE_OK;
}

void reblock_free(ValkeyModuleCtx *ctx, void *privdata) {
UNUSED(ctx);
UNUSED(privdata);
}

void reblock_disconnect(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) {
UNUSED(ctx);
reblock_disconnect_called++;
ValkeyModule_UnblockClient(bc, NULL);
}

static ValkeyModuleBlockedClient *reblock_bc = NULL;

void reblock_timer_cb(ValkeyModuleCtx *ctx, void *data) {
UNUSED(data);
ValkeyModule_UnblockClient(reblock_bc, NULL);
/* If not done yet, schedule the next unblock from this timer */
if (reblock_reply_count < 2) {
ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL);
}
}

/* Command: reblock.test — blocks, re-blocks twice, replies on third unblock */
int reblock_test_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
reblock_reply_count = 0;
reblock_bc = ValkeyModule_BlockClient(ctx, reblock_reply, NULL, reblock_free, 5000);
ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL);
return VALKEYMODULE_OK;
}

/* Reply callback that always re-blocks (to test timeout while re-blocked).
* Called exactly once — the initial UnblockClient triggers it, then the client
* stays re-blocked until timeout fires. */
static int reblock_timeout_reply_count = 0;

int reblock_timeout_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(ctx);
UNUSED(argv);
UNUSED(argc);
reblock_timeout_reply_count++;
assert(reblock_timeout_reply_count == 1);
return VALKEYMODULE_REPLY_AGAIN;
}

int reblock_timeout_cb(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
ValkeyModuleBlockedClient *bc = ValkeyModule_GetBlockedClientHandle(ctx);
ValkeyModule_ReplyWithSimpleString(ctx, "Timed out while re-blocked");
ValkeyModule_UnblockClient(bc, NULL);
return VALKEYMODULE_OK;
}

static ValkeyModuleBlockedClient *reblock_timeout_bc = NULL;

/* Command: reblock.timeout — blocks, re-blocks forever, should timeout */
int reblock_timeout_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
reblock_timeout_reply_count = 0;
reblock_timeout_bc = ValkeyModule_BlockClient(ctx, reblock_timeout_reply, reblock_timeout_cb, reblock_free, 500);
/* Unblock once to trigger the reply callback (which returns REPLY_AGAIN).
* After that, timeout at 500ms should fire. */
ValkeyModule_UnblockClient(reblock_timeout_bc, NULL);
return VALKEYMODULE_OK;
}

/* Reply callback that re-blocks until disconnect (reply cb is never called
* after disconnect since c == NULL, so this always returns REPLY_AGAIN) */
int reblock_disconnect_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(ctx);
UNUSED(argv);
UNUSED(argc);
return VALKEYMODULE_REPLY_AGAIN;
}

/* Command: reblock.disconnect — blocks, re-blocks, waits for disconnect.
* Use reblock.get_disconnect_called to verify disconnect callback fired. */
int reblock_disconnect_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
reblock_disconnect_called = 0;
ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, reblock_disconnect_reply, NULL, reblock_free, 0);
ValkeyModule_SetDisconnectCallback(bc, reblock_disconnect);
/* Unblock once to trigger reply callback which then returns REPLY_AGAIN */
ValkeyModule_UnblockClient(bc, NULL);
return VALKEYMODULE_OK;
}

/* Command: reblock.get_disconnect_called — returns disconnect callback count */
int reblock_get_disconnect_called(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
ValkeyModule_ReplyWithLongLong(ctx, reblock_disconnect_called);
return VALKEYMODULE_OK;
}

int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
Expand Down Expand Up @@ -1114,5 +1232,17 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
if (ValkeyModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "reblock.test", reblock_test_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "reblock.timeout", reblock_timeout_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "reblock.disconnect", reblock_disconnect_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "reblock.get_disconnect_called", reblock_get_disconnect_called, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

return VALKEYMODULE_OK;
}
41 changes: 40 additions & 1 deletion tests/unit/moduleapi/blockedclient.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,46 @@ foreach call_type {nested normal} {
# when the client is unlock, we will get the OK reply from timer.
assert_match "OK" [r unblock_by_timer 100 100]
}


test {REPLY_AGAIN re-blocks and eventually replies} {
# reblock.test: blocks, reply callback returns REPLY_AGAIN twice,
# then replies with the call count (3) on the third unblock.
set result [r reblock.test]
assert_equal 3 $result
}

test {REPLY_AGAIN with timeout fires timeout callback} {
# reblock.timeout: reply callback always returns REPLY_AGAIN,
# timeout is 500ms, should get timeout response.
set result [r reblock.timeout]
assert_match "*Timed out*" $result
}

test {REPLY_AGAIN disconnect fires disconnect callback} {
set rd [valkey_deferring_client]
$rd client id
set client_id [$rd read]
$rd reblock.disconnect
assert_equal 1 [r client kill id $client_id]
wait_for_condition 50 20 {
[r reblock.get_disconnect_called] eq 1
} else {
fail "Failed waiting for disconnect callback"
}
$rd close
}

test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
set rd [valkey_deferring_client]
$rd client id
set client_id [$rd read]
$rd reblock.timeout
assert_equal 1 [r client unblock $client_id timeout]
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
Comment on lines +314 to +337
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make the re-block integration tests deterministic and enforce the intended path.

These tests currently rely on fixed sleeps and optional client kill/unblock, which can make them flaky and can let the CLIENT UNBLOCK case pass via natural timeout instead of the unblock path.

💡 Proposed fix
@@
     test {REPLY_AGAIN disconnect fires disconnect callback} {
         # Start reblock.disconnect on a separate client, then kill that client.
         set rd [valkey_deferring_client]
         $rd reblock.disconnect
-        after 100
-        # Get the client id
-        set clients [r client list]
         set target_id ""
-        foreach line [split $clients "\n"] {
-            if {[string match "*cmd=reblock.disconnect*" $line]} {
-                regexp {id=(\d+)} $line -> target_id
-            }
-        }
-        # Kill the client
-        if {$target_id ne ""} {
-            r client kill id $target_id
-        }
-        after 200
-        # Verify disconnect callback was called
-        assert_equal 1 [r reblock.get_disconnect_called]
+        wait_for_condition 50 20 {
+            set clients [r client list]
+            set target_id ""
+            foreach line [split $clients "\n"] {
+                if {[string match "*cmd=reblock.disconnect*" $line]} {
+                    regexp {id=(\d+)} $line -> target_id
+                    break
+                }
+            }
+            $target_id ne ""
+        } else {
+            fail "Failed to find client running reblock.disconnect"
+        }
+
+        assert_equal 1 [r client kill id $target_id]
+        wait_for_condition 50 20 {
+            [r reblock.get_disconnect_called] eq 1
+        } else {
+            fail "Failed waiting for disconnect callback"
+        }
         $rd close
     }
@@
     test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
         # Block a client with reblock that always returns REPLY_AGAIN
         # then use CLIENT UNBLOCK to abort it — should trigger timeout cb
         set rd [valkey_deferring_client]
         $rd reblock.timeout
-        after 100
-        set clients [r client list]
         set target_id ""
-        foreach line [split $clients "\n"] {
-            if {[string match "*cmd=reblock.timeout*" $line]} {
-                regexp {id=(\d+)} $line -> target_id
-            }
-        }
-        if {$target_id ne ""} {
-            r client unblock $target_id timeout
-        }
+        wait_for_condition 50 20 {
+            set clients [r client list]
+            set target_id ""
+            foreach line [split $clients "\n"] {
+                if {[string match "*cmd=reblock.timeout*" $line]} {
+                    regexp {id=(\d+)} $line -> target_id
+                    break
+                }
+            }
+            $target_id ne ""
+        } else {
+            fail "Failed to find client running reblock.timeout"
+        }
+
+        assert_equal 1 [r client unblock $target_id timeout]
         set result [$rd read]
         assert_match "*Timed out*" $result
         $rd close
     }

As per coding guidelines: "Avoid timing-dependent tests; use proper synchronization (test reliability)".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
after 100
# Get the client id
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.disconnect*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
# Kill the client
if {$target_id ne ""} {
r client kill id $target_id
}
after 200
# Verify disconnect callback was called
assert_equal 1 [r reblock.get_disconnect_called]
$rd close
}
test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
after 100
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.timeout*" $line]} {
regexp {id=(\d+)} $line -> target_id
}
}
if {$target_id ne ""} {
r client unblock $target_id timeout
}
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
set target_id ""
wait_for_condition 50 20 {
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.disconnect*" $line]} {
regexp {id=(\d+)} $line -> target_id
break
}
}
$target_id ne ""
} else {
fail "Failed to find client running reblock.disconnect"
}
assert_equal 1 [r client kill id $target_id]
wait_for_condition 50 20 {
[r reblock.get_disconnect_called] eq 1
} else {
fail "Failed waiting for disconnect callback"
}
$rd close
}
test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
set target_id ""
wait_for_condition 50 20 {
set clients [r client list]
set target_id ""
foreach line [split $clients "\n"] {
if {[string match "*cmd=reblock.timeout*" $line]} {
regexp {id=(\d+)} $line -> target_id
break
}
}
$target_id ne ""
} else {
fail "Failed to find client running reblock.timeout"
}
assert_equal 1 [r client unblock $target_id timeout]
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/moduleapi/blockedclient.tcl` around lines 314 - 356, The tests
rely on sleeps and racey behavior; replace time-based waits with explicit
synchronization: after starting valkey_deferring_client and issuing
reblock.disconnect or reblock.timeout, poll r client list (the existing foreach
over split $clients) in a short loop with a timeout to wait until a line
matching "*cmd=reblock.disconnect*" or "*cmd=reblock.timeout*" yields a
non-empty target_id, then immediately perform r client kill id $target_id or r
client unblock $target_id timeout respectively; after sending unblock/kill, wait
for the specific deterministic signals using r reblock.get_disconnect_called or
the exact $rd read result (rather than fixed after N ms), and only then close
$rd—this enforces the intended unblock/kill path and removes flaky sleeps.


test "Unload the module - blockedclient" {
assert_equal {OK} [r module unload blockedclient]
}
Expand Down
Loading