From 1aee43809bdff7a50e779af864f2bc7e82210ed1 Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Thu, 14 May 2026 08:17:57 +0000 Subject: [PATCH 1/3] Support VALKEYMODULE_REPLY_AGAIN in VM_UnblockClient Reply cb Signed-off-by: Karthik Subbarao --- src/module.c | 16 ++- src/valkeymodule.h | 3 + tests/modules/blockedclient.c | 142 +++++++++++++++++++++++++ tests/unit/moduleapi/blockedclient.tcl | 60 ++++++++++- 4 files changed, 219 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index 4342144016d..592c0360566 100644 --- a/src/module.c +++ b/src/module.c @@ -8980,6 +8980,7 @@ void moduleHandleBlockedClients(void) { * the key was signaled as ready. */ long long prev_error_replies = server.stat_total_error_replies; uint64_t reply_us = 0; + int reply_ret = VALKEYMODULE_OK; if (c && !bc->blocked_on_keys && bc->reply_callback) { ValkeyModuleCtx ctx; moduleCreateContext(&ctx, bc->module, VALKEYMODULE_CTX_BLOCKED_REPLY); @@ -8989,10 +8990,23 @@ void moduleHandleBlockedClients(void) { ctx.blocked_client = bc; monotime replyTimer; elapsedStart(&replyTimer); - bc->reply_callback(&ctx, (void **)c->argv, c->argc); + reply_ret = bc->reply_callback(&ctx, (void **)c->argv, c->argc); reply_us = elapsedUs(replyTimer); moduleFreeContext(&ctx); } + + /* If reply callback returned VALKEYMODULE_REPLY_AGAIN, keep the client + * blocked. The module must not have written any reply before returning + * this value. The module will call UnblockClient again later. */ + if (reply_ret == VALKEYMODULE_REPLY_AGAIN) { + moduleReleaseTempClient(bc->reply_client); + moduleReleaseTempClient(bc->thread_safe_ctx_client); + bc->reply_client = moduleAllocTempClient(); + bc->thread_safe_ctx_client = moduleAllocTempClient(); + bc->unblocked = 0; + pthread_mutex_lock(&moduleUnblockedClientsMutex); + continue; + } /* Hold onto the blocked client if module auth is in progress. The reply callback is invoked * when the client is reprocessed. */ if (c && clientHasModuleAuthInProgress(c)) { diff --git a/src/valkeymodule.h b/src/valkeymodule.h index c284e63d866..8c6571d7afd 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -50,6 +50,9 @@ typedef long long ustime_t; #define VALKEYMODULE_OK 0 #define VALKEYMODULE_ERR 1 +/* Return from reply callback to keep client blocked (re-block). */ +#define VALKEYMODULE_REPLY_AGAIN 2 + /* Module Based Authentication status return values. */ #define VALKEYMODULE_AUTH_HANDLED 0 #define VALKEYMODULE_AUTH_NOT_HANDLED 1 diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index e29bf71ab79..8b72e04a178 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -1020,6 +1020,133 @@ int unblock_by_timer(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) return VALKEYMODULE_OK; } +/* --- REPLY_AGAIN (re-block) tests --- */ + +static int reblock_reply_count = 0; +static volatile int reblock_disconnect_called = 0; +static volatile int reblock_free_called = 0; + +/* Reply callback that re-blocks twice, then replies on the third call. */ +int reblock_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_reply_count++; + if (reblock_reply_count < 3) { + /* Re-block: don't write any reply */ + return VALKEYMODULE_REPLY_AGAIN; + } + /* Third time: actually reply */ + ValkeyModule_ReplyWithLongLong(ctx, reblock_reply_count); + return VALKEYMODULE_OK; +} + +void reblock_free(ValkeyModuleCtx *ctx, void *privdata) { + UNUSED(ctx); + UNUSED(privdata); + reblock_free_called++; +} + +void reblock_disconnect(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) { + UNUSED(ctx); + reblock_disconnect_called++; + ValkeyModule_UnblockClient(bc, NULL); +} + +static ValkeyModuleBlockedClient *reblock_bc = NULL; + +void reblock_timer_cb(ValkeyModuleCtx *ctx, void *data) { + UNUSED(ctx); + UNUSED(data); + /* Each timer fire triggers one UnblockClient. The reply callback + * returns REPLY_AGAIN for the first two, then replies on the third. */ + ValkeyModule_UnblockClient(reblock_bc, NULL); +} + +/* Command: reblock.test — blocks, re-blocks twice, replies on third unblock */ +int reblock_test_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_reply_count = 0; + reblock_bc = ValkeyModule_BlockClient(ctx, reblock_reply, NULL, reblock_free, 5000); + /* Schedule 3 timer callbacks at 10ms intervals */ + ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL); + ValkeyModule_CreateTimer(ctx, 30, reblock_timer_cb, NULL); + ValkeyModule_CreateTimer(ctx, 50, reblock_timer_cb, NULL); + return VALKEYMODULE_OK; +} + +/* Reply callback that always re-blocks (to test timeout while re-blocked) */ +int reblock_timeout_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(ctx); + UNUSED(argv); + UNUSED(argc); + return VALKEYMODULE_REPLY_AGAIN; +} + +int reblock_timeout_cb(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + ValkeyModule_ReplyWithSimpleString(ctx, "Timed out while re-blocked"); + return VALKEYMODULE_OK; +} + +static ValkeyModuleBlockedClient *reblock_timeout_bc = NULL; + +void reblock_timeout_timer_cb(ValkeyModuleCtx *ctx, void *data) { + UNUSED(ctx); + UNUSED(data); + ValkeyModule_UnblockClient(reblock_timeout_bc, NULL); +} + +/* Command: reblock.timeout — blocks, re-blocks forever, should timeout */ +int reblock_timeout_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_timeout_bc = ValkeyModule_BlockClient(ctx, reblock_timeout_reply, reblock_timeout_cb, reblock_free, 500); + /* Unblock once to trigger the reply callback (which returns REPLY_AGAIN). + * After that, timeout at 500ms should fire. */ + ValkeyModule_CreateTimer(ctx, 10, reblock_timeout_timer_cb, NULL); + return VALKEYMODULE_OK; +} + +/* Reply callback that always re-blocks (for disconnect test) */ +int reblock_disconnect_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(ctx); + UNUSED(argv); + UNUSED(argc); + return VALKEYMODULE_REPLY_AGAIN; +} + +/* Command: reblock.disconnect — blocks, re-blocks, waits for disconnect. + * Use reblock.get_disconnect_called to verify disconnect callback fired. */ +int reblock_disconnect_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_disconnect_called = 0; + reblock_free_called = 0; + ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, reblock_disconnect_reply, NULL, reblock_free, 0); + ValkeyModule_SetDisconnectCallback(bc, reblock_disconnect); + /* Unblock once to trigger reply callback which returns REPLY_AGAIN */ + ValkeyModule_UnblockClient(bc, NULL); + return VALKEYMODULE_OK; +} + +/* Command: reblock.get_disconnect_called — returns disconnect callback count */ +int reblock_get_disconnect_called(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + ValkeyModule_ReplyWithLongLong(ctx, reblock_disconnect_called); + return VALKEYMODULE_OK; +} + +/* Command: reblock.get_free_called — returns free callback count */ +int reblock_get_free_called(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + ValkeyModule_ReplyWithLongLong(ctx, reblock_free_called); + return VALKEYMODULE_OK; +} + int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { VALKEYMODULE_NOT_USED(argv); VALKEYMODULE_NOT_USED(argc); @@ -1114,5 +1241,20 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg if (ValkeyModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR; + if (ValkeyModule_CreateCommand(ctx, "reblock.test", reblock_test_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.timeout", reblock_timeout_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.disconnect", reblock_disconnect_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.get_disconnect_called", reblock_get_disconnect_called, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.get_free_called", reblock_get_free_called, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + return VALKEYMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index f894563b052..6151b0c3890 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -296,7 +296,65 @@ foreach call_type {nested normal} { # when the client is unlock, we will get the OK reply from timer. assert_match "OK" [r unblock_by_timer 100 100] } - + + test {REPLY_AGAIN re-blocks and eventually replies} { + # reblock.test: blocks, reply callback returns REPLY_AGAIN twice, + # then replies with the call count (3) on the third unblock. + set result [r reblock.test] + assert_equal 3 $result + } + + test {REPLY_AGAIN with timeout fires timeout callback} { + # reblock.timeout: reply callback always returns REPLY_AGAIN, + # timeout is 500ms, should get timeout response. + set result [r reblock.timeout] + assert_match "*Timed out*" $result + } + + test {REPLY_AGAIN disconnect fires disconnect callback} { + # Start reblock.disconnect on a separate client, then kill that client. + set rd [valkey_deferring_client] + $rd reblock.disconnect + after 100 + # Get the client id + set clients [r client list] + set target_id "" + foreach line [split $clients "\n"] { + if {[string match "*cmd=reblock.disconnect*" $line]} { + regexp {id=(\d+)} $line -> target_id + } + } + # Kill the client + if {$target_id ne ""} { + r client kill id $target_id + } + after 200 + # Verify disconnect callback was called + assert_equal 1 [r reblock.get_disconnect_called] + $rd close + } + + test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { + # Block a client with reblock that always returns REPLY_AGAIN + # then use CLIENT UNBLOCK to abort it — should trigger timeout cb + set rd [valkey_deferring_client] + $rd reblock.timeout + after 100 + set clients [r client list] + set target_id "" + foreach line [split $clients "\n"] { + if {[string match "*cmd=reblock.timeout*" $line]} { + regexp {id=(\d+)} $line -> target_id + } + } + if {$target_id ne ""} { + r client unblock $target_id timeout + } + set result [$rd read] + assert_match "*Timed out*" $result + $rd close + } + test "Unload the module - blockedclient" { assert_equal {OK} [r module unload blockedclient] } From a8a75f42911c27d7fc3a1fe9361743ddb941c36a Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Thu, 14 May 2026 21:21:54 +0000 Subject: [PATCH 2/3] WIP Signed-off-by: Karthik Subbarao --- src/module.c | 5 ++- tests/modules/blockedclient.c | 46 ++++++++++---------------- tests/unit/moduleapi/blockedclient.tcl | 39 ++++++---------------- 3 files changed, 31 insertions(+), 59 deletions(-) diff --git a/src/module.c b/src/module.c index 592c0360566..69b1535d61c 100644 --- a/src/module.c +++ b/src/module.c @@ -8997,12 +8997,15 @@ void moduleHandleBlockedClients(void) { /* If reply callback returned VALKEYMODULE_REPLY_AGAIN, keep the client * blocked. The module must not have written any reply before returning - * this value. The module will call UnblockClient again later. */ + * this value. The module will call UnblockClient again later. + * The module must handle timeout/disconnect by calling UnblockClient + * from those callbacks (standard blocked client contract). */ if (reply_ret == VALKEYMODULE_REPLY_AGAIN) { moduleReleaseTempClient(bc->reply_client); moduleReleaseTempClient(bc->thread_safe_ctx_client); bc->reply_client = moduleAllocTempClient(); bc->thread_safe_ctx_client = moduleAllocTempClient(); + if (c) bc->reply_client->resp = c->resp; bc->unblocked = 0; pthread_mutex_lock(&moduleUnblockedClientsMutex); continue; diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 8b72e04a178..093aad5b9fb 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -1024,7 +1024,6 @@ int unblock_by_timer(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) static int reblock_reply_count = 0; static volatile int reblock_disconnect_called = 0; -static volatile int reblock_free_called = 0; /* Reply callback that re-blocks twice, then replies on the third call. */ int reblock_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { @@ -1043,7 +1042,6 @@ int reblock_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { void reblock_free(ValkeyModuleCtx *ctx, void *privdata) { UNUSED(ctx); UNUSED(privdata); - reblock_free_called++; } void reblock_disconnect(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) { @@ -1055,11 +1053,12 @@ void reblock_disconnect(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) { static ValkeyModuleBlockedClient *reblock_bc = NULL; void reblock_timer_cb(ValkeyModuleCtx *ctx, void *data) { - UNUSED(ctx); UNUSED(data); - /* Each timer fire triggers one UnblockClient. The reply callback - * returns REPLY_AGAIN for the first two, then replies on the third. */ ValkeyModule_UnblockClient(reblock_bc, NULL); + /* If not done yet, schedule the next unblock from this timer */ + if (reblock_reply_count < 2) { + ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL); + } } /* Command: reblock.test — blocks, re-blocks twice, replies on third unblock */ @@ -1068,48 +1067,49 @@ int reblock_test_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) UNUSED(argc); reblock_reply_count = 0; reblock_bc = ValkeyModule_BlockClient(ctx, reblock_reply, NULL, reblock_free, 5000); - /* Schedule 3 timer callbacks at 10ms intervals */ ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL); - ValkeyModule_CreateTimer(ctx, 30, reblock_timer_cb, NULL); - ValkeyModule_CreateTimer(ctx, 50, reblock_timer_cb, NULL); return VALKEYMODULE_OK; } -/* Reply callback that always re-blocks (to test timeout while re-blocked) */ +/* Reply callback that always re-blocks (to test timeout while re-blocked). + * Called exactly once — the initial UnblockClient triggers it, then the client + * stays re-blocked until timeout fires. */ +static int reblock_timeout_reply_count = 0; + int reblock_timeout_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { UNUSED(ctx); UNUSED(argv); UNUSED(argc); + reblock_timeout_reply_count++; + assert(reblock_timeout_reply_count == 1); return VALKEYMODULE_REPLY_AGAIN; } int reblock_timeout_cb(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { UNUSED(argv); UNUSED(argc); + ValkeyModuleBlockedClient *bc = ValkeyModule_GetBlockedClientHandle(ctx); ValkeyModule_ReplyWithSimpleString(ctx, "Timed out while re-blocked"); + ValkeyModule_UnblockClient(bc, NULL); return VALKEYMODULE_OK; } static ValkeyModuleBlockedClient *reblock_timeout_bc = NULL; -void reblock_timeout_timer_cb(ValkeyModuleCtx *ctx, void *data) { - UNUSED(ctx); - UNUSED(data); - ValkeyModule_UnblockClient(reblock_timeout_bc, NULL); -} - /* Command: reblock.timeout — blocks, re-blocks forever, should timeout */ int reblock_timeout_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { UNUSED(argv); UNUSED(argc); + reblock_timeout_reply_count = 0; reblock_timeout_bc = ValkeyModule_BlockClient(ctx, reblock_timeout_reply, reblock_timeout_cb, reblock_free, 500); /* Unblock once to trigger the reply callback (which returns REPLY_AGAIN). * After that, timeout at 500ms should fire. */ - ValkeyModule_CreateTimer(ctx, 10, reblock_timeout_timer_cb, NULL); + ValkeyModule_UnblockClient(reblock_timeout_bc, NULL); return VALKEYMODULE_OK; } -/* Reply callback that always re-blocks (for disconnect test) */ +/* Reply callback that re-blocks until disconnect (reply cb is never called + * after disconnect since c == NULL, so this always returns REPLY_AGAIN) */ int reblock_disconnect_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { UNUSED(ctx); UNUSED(argv); @@ -1123,7 +1123,6 @@ int reblock_disconnect_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int UNUSED(argv); UNUSED(argc); reblock_disconnect_called = 0; - reblock_free_called = 0; ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, reblock_disconnect_reply, NULL, reblock_free, 0); ValkeyModule_SetDisconnectCallback(bc, reblock_disconnect); /* Unblock once to trigger reply callback which returns REPLY_AGAIN */ @@ -1139,14 +1138,6 @@ int reblock_get_disconnect_called(ValkeyModuleCtx *ctx, ValkeyModuleString **arg return VALKEYMODULE_OK; } -/* Command: reblock.get_free_called — returns free callback count */ -int reblock_get_free_called(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { - UNUSED(argv); - UNUSED(argc); - ValkeyModule_ReplyWithLongLong(ctx, reblock_free_called); - return VALKEYMODULE_OK; -} - int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { VALKEYMODULE_NOT_USED(argv); VALKEYMODULE_NOT_USED(argc); @@ -1253,8 +1244,5 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg if (ValkeyModule_CreateCommand(ctx, "reblock.get_disconnect_called", reblock_get_disconnect_called, "", 0, 0, 0) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR; - if (ValkeyModule_CreateCommand(ctx, "reblock.get_free_called", reblock_get_free_called, "", 0, 0, 0) == VALKEYMODULE_ERR) - return VALKEYMODULE_ERR; - return VALKEYMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 6151b0c3890..5f5e1d7a9d0 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -312,44 +312,25 @@ foreach call_type {nested normal} { } test {REPLY_AGAIN disconnect fires disconnect callback} { - # Start reblock.disconnect on a separate client, then kill that client. set rd [valkey_deferring_client] + $rd client id + set client_id [$rd read] $rd reblock.disconnect - after 100 - # Get the client id - set clients [r client list] - set target_id "" - foreach line [split $clients "\n"] { - if {[string match "*cmd=reblock.disconnect*" $line]} { - regexp {id=(\d+)} $line -> target_id - } - } - # Kill the client - if {$target_id ne ""} { - r client kill id $target_id + assert_equal 1 [r client kill id $client_id] + wait_for_condition 50 20 { + [r reblock.get_disconnect_called] eq 1 + } else { + fail "Failed waiting for disconnect callback" } - after 200 - # Verify disconnect callback was called - assert_equal 1 [r reblock.get_disconnect_called] $rd close } test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { - # Block a client with reblock that always returns REPLY_AGAIN - # then use CLIENT UNBLOCK to abort it — should trigger timeout cb set rd [valkey_deferring_client] + $rd client id + set client_id [$rd read] $rd reblock.timeout - after 100 - set clients [r client list] - set target_id "" - foreach line [split $clients "\n"] { - if {[string match "*cmd=reblock.timeout*" $line]} { - regexp {id=(\d+)} $line -> target_id - } - } - if {$target_id ne ""} { - r client unblock $target_id timeout - } + assert_equal 1 [r client unblock $client_id timeout] set result [$rd read] assert_match "*Timed out*" $result $rd close From 206b17d1b49b4675bcf68e03a85a0843b0632c8e Mon Sep 17 00:00:00 2001 From: Karthik Subbarao Date: Thu, 14 May 2026 21:21:54 +0000 Subject: [PATCH 3/3] WIP Signed-off-by: Karthik Subbarao --- tests/modules/blockedclient.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 093aad5b9fb..ca07d4d7c56 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -1125,7 +1125,7 @@ int reblock_disconnect_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int reblock_disconnect_called = 0; ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, reblock_disconnect_reply, NULL, reblock_free, 0); ValkeyModule_SetDisconnectCallback(bc, reblock_disconnect); - /* Unblock once to trigger reply callback which returns REPLY_AGAIN */ + /* Unblock once to trigger reply callback which then returns REPLY_AGAIN */ ValkeyModule_UnblockClient(bc, NULL); return VALKEYMODULE_OK; }