diff --git a/src/module.c b/src/module.c index 4342144016d..69b1535d61c 100644 --- a/src/module.c +++ b/src/module.c @@ -8980,6 +8980,7 @@ void moduleHandleBlockedClients(void) { * the key was signaled as ready. */ long long prev_error_replies = server.stat_total_error_replies; uint64_t reply_us = 0; + int reply_ret = VALKEYMODULE_OK; if (c && !bc->blocked_on_keys && bc->reply_callback) { ValkeyModuleCtx ctx; moduleCreateContext(&ctx, bc->module, VALKEYMODULE_CTX_BLOCKED_REPLY); @@ -8989,10 +8990,26 @@ void moduleHandleBlockedClients(void) { ctx.blocked_client = bc; monotime replyTimer; elapsedStart(&replyTimer); - bc->reply_callback(&ctx, (void **)c->argv, c->argc); + reply_ret = bc->reply_callback(&ctx, (void **)c->argv, c->argc); reply_us = elapsedUs(replyTimer); moduleFreeContext(&ctx); } + + /* If reply callback returned VALKEYMODULE_REPLY_AGAIN, keep the client + * blocked. The module must not have written any reply before returning + * this value. The module will call UnblockClient again later. + * The module must handle timeout/disconnect by calling UnblockClient + * from those callbacks (standard blocked client contract). */ + if (reply_ret == VALKEYMODULE_REPLY_AGAIN) { + moduleReleaseTempClient(bc->reply_client); + moduleReleaseTempClient(bc->thread_safe_ctx_client); + bc->reply_client = moduleAllocTempClient(); + bc->thread_safe_ctx_client = moduleAllocTempClient(); + if (c) bc->reply_client->resp = c->resp; + bc->unblocked = 0; + pthread_mutex_lock(&moduleUnblockedClientsMutex); + continue; + } /* Hold onto the blocked client if module auth is in progress. The reply callback is invoked * when the client is reprocessed. */ if (c && clientHasModuleAuthInProgress(c)) { diff --git a/src/valkeymodule.h b/src/valkeymodule.h index c284e63d866..8c6571d7afd 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -50,6 +50,9 @@ typedef long long ustime_t; #define VALKEYMODULE_OK 0 #define VALKEYMODULE_ERR 1 +/* Return from reply callback to keep client blocked (re-block). */ +#define VALKEYMODULE_REPLY_AGAIN 2 + /* Module Based Authentication status return values. */ #define VALKEYMODULE_AUTH_HANDLED 0 #define VALKEYMODULE_AUTH_NOT_HANDLED 1 diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index e29bf71ab79..ca07d4d7c56 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -1020,6 +1020,124 @@ int unblock_by_timer(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) return VALKEYMODULE_OK; } +/* --- REPLY_AGAIN (re-block) tests --- */ + +static int reblock_reply_count = 0; +static volatile int reblock_disconnect_called = 0; + +/* Reply callback that re-blocks twice, then replies on the third call. */ +int reblock_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_reply_count++; + if (reblock_reply_count < 3) { + /* Re-block: don't write any reply */ + return VALKEYMODULE_REPLY_AGAIN; + } + /* Third time: actually reply */ + ValkeyModule_ReplyWithLongLong(ctx, reblock_reply_count); + return VALKEYMODULE_OK; +} + +void reblock_free(ValkeyModuleCtx *ctx, void *privdata) { + UNUSED(ctx); + UNUSED(privdata); +} + +void reblock_disconnect(ValkeyModuleCtx *ctx, ValkeyModuleBlockedClient *bc) { + UNUSED(ctx); + reblock_disconnect_called++; + ValkeyModule_UnblockClient(bc, NULL); +} + +static ValkeyModuleBlockedClient *reblock_bc = NULL; + +void reblock_timer_cb(ValkeyModuleCtx *ctx, void *data) { + UNUSED(data); + ValkeyModule_UnblockClient(reblock_bc, NULL); + /* If not done yet, schedule the next unblock from this timer */ + if (reblock_reply_count < 2) { + ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL); + } +} + +/* Command: reblock.test — blocks, re-blocks twice, replies on third unblock */ +int reblock_test_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_reply_count = 0; + reblock_bc = ValkeyModule_BlockClient(ctx, reblock_reply, NULL, reblock_free, 5000); + ValkeyModule_CreateTimer(ctx, 10, reblock_timer_cb, NULL); + return VALKEYMODULE_OK; +} + +/* Reply callback that always re-blocks (to test timeout while re-blocked). + * Called exactly once — the initial UnblockClient triggers it, then the client + * stays re-blocked until timeout fires. */ +static int reblock_timeout_reply_count = 0; + +int reblock_timeout_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(ctx); + UNUSED(argv); + UNUSED(argc); + reblock_timeout_reply_count++; + assert(reblock_timeout_reply_count == 1); + return VALKEYMODULE_REPLY_AGAIN; +} + +int reblock_timeout_cb(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + ValkeyModuleBlockedClient *bc = ValkeyModule_GetBlockedClientHandle(ctx); + ValkeyModule_ReplyWithSimpleString(ctx, "Timed out while re-blocked"); + ValkeyModule_UnblockClient(bc, NULL); + return VALKEYMODULE_OK; +} + +static ValkeyModuleBlockedClient *reblock_timeout_bc = NULL; + +/* Command: reblock.timeout — blocks, re-blocks forever, should timeout */ +int reblock_timeout_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_timeout_reply_count = 0; + reblock_timeout_bc = ValkeyModule_BlockClient(ctx, reblock_timeout_reply, reblock_timeout_cb, reblock_free, 500); + /* Unblock once to trigger the reply callback (which returns REPLY_AGAIN). + * After that, timeout at 500ms should fire. */ + ValkeyModule_UnblockClient(reblock_timeout_bc, NULL); + return VALKEYMODULE_OK; +} + +/* Reply callback that re-blocks until disconnect (reply cb is never called + * after disconnect since c == NULL, so this always returns REPLY_AGAIN) */ +int reblock_disconnect_reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(ctx); + UNUSED(argv); + UNUSED(argc); + return VALKEYMODULE_REPLY_AGAIN; +} + +/* Command: reblock.disconnect — blocks, re-blocks, waits for disconnect. + * Use reblock.get_disconnect_called to verify disconnect callback fired. */ +int reblock_disconnect_cmd(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + reblock_disconnect_called = 0; + ValkeyModuleBlockedClient *bc = ValkeyModule_BlockClient(ctx, reblock_disconnect_reply, NULL, reblock_free, 0); + ValkeyModule_SetDisconnectCallback(bc, reblock_disconnect); + /* Unblock once to trigger reply callback which then returns REPLY_AGAIN */ + ValkeyModule_UnblockClient(bc, NULL); + return VALKEYMODULE_OK; +} + +/* Command: reblock.get_disconnect_called — returns disconnect callback count */ +int reblock_get_disconnect_called(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + ValkeyModule_ReplyWithLongLong(ctx, reblock_disconnect_called); + return VALKEYMODULE_OK; +} + int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { VALKEYMODULE_NOT_USED(argv); VALKEYMODULE_NOT_USED(argc); @@ -1114,5 +1232,17 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg if (ValkeyModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR; + if (ValkeyModule_CreateCommand(ctx, "reblock.test", reblock_test_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.timeout", reblock_timeout_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.disconnect", reblock_disconnect_cmd, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "reblock.get_disconnect_called", reblock_get_disconnect_called, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + return VALKEYMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index f894563b052..5f5e1d7a9d0 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -296,7 +296,46 @@ foreach call_type {nested normal} { # when the client is unlock, we will get the OK reply from timer. assert_match "OK" [r unblock_by_timer 100 100] } - + + test {REPLY_AGAIN re-blocks and eventually replies} { + # reblock.test: blocks, reply callback returns REPLY_AGAIN twice, + # then replies with the call count (3) on the third unblock. + set result [r reblock.test] + assert_equal 3 $result + } + + test {REPLY_AGAIN with timeout fires timeout callback} { + # reblock.timeout: reply callback always returns REPLY_AGAIN, + # timeout is 500ms, should get timeout response. + set result [r reblock.timeout] + assert_match "*Timed out*" $result + } + + test {REPLY_AGAIN disconnect fires disconnect callback} { + set rd [valkey_deferring_client] + $rd client id + set client_id [$rd read] + $rd reblock.disconnect + assert_equal 1 [r client kill id $client_id] + wait_for_condition 50 20 { + [r reblock.get_disconnect_called] eq 1 + } else { + fail "Failed waiting for disconnect callback" + } + $rd close + } + + test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { + set rd [valkey_deferring_client] + $rd client id + set client_id [$rd read] + $rd reblock.timeout + assert_equal 1 [r client unblock $client_id timeout] + set result [$rd read] + assert_match "*Timed out*" $result + $rd close + } + test "Unload the module - blockedclient" { assert_equal {OK} [r module unload blockedclient] }