Support VALKEYMODULE_REPLY_AGAIN in VM_UnblockClient Reply cb#3707
Support VALKEYMODULE_REPLY_AGAIN in VM_UnblockClient Reply cb#3707KarthikSubbarao wants to merge 3 commits into
Conversation
Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Plus Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces ChangesBlocked Client Re-Blocking Feature
🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/module.c`:
- Around line 9002-9005: The code releases and re-allocates
bc->thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN which invalidates
ctx->client returned by VM_GetThreadSafeContext and can cause use-after-free;
fix by not recycling or replacing bc->thread_safe_ctx_client in this path —
remove or skip the moduleReleaseTempClient/moduleAllocTempClient calls for
bc->thread_safe_ctx_client here and only touch bc->reply_client (or allocate a
new thread-safe client per VM_GetThreadSafeContext and ensure the context
owns/releases it) so outstanding thread-safe contexts keep a stable client
pointer.
- Around line 9002-9005: When replacing the temporary clients in the reblock
path, copy the original client's RESP mode into the newly allocated reply client
so buffered replies use the correct RESP version: after calling
moduleReleaseTempClient and moduleAllocTempClient for bc->reply_client (and
similarly for bc->thread_safe_ctx_client if needed), set bc->reply_client->resp
= bc->client->resp (or use the appropriate accessor on the new temp client) to
restore the RESP mode that was set earlier in the initial block path.
In `@tests/unit/moduleapi/blockedclient.tcl`:
- Around line 314-356: The tests rely on sleeps and racey behavior; replace
time-based waits with explicit synchronization: after starting
valkey_deferring_client and issuing reblock.disconnect or reblock.timeout, poll
r client list (the existing foreach over split $clients) in a short loop with a
timeout to wait until a line matching "*cmd=reblock.disconnect*" or
"*cmd=reblock.timeout*" yields a non-empty target_id, then immediately perform r
client kill id $target_id or r client unblock $target_id timeout respectively;
after sending unblock/kill, wait for the specific deterministic signals using r
reblock.get_disconnect_called or the exact $rd read result (rather than fixed
after N ms), and only then close $rd—this enforces the intended unblock/kill
path and removes flaky sleeps.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 639266a1-164f-440e-a2ce-4dd997e3d65c
📒 Files selected for processing (4)
src/module.csrc/valkeymodule.htests/modules/blockedclient.ctests/unit/moduleapi/blockedclient.tcl
| moduleReleaseTempClient(bc->reply_client); | ||
| moduleReleaseTempClient(bc->thread_safe_ctx_client); | ||
| bc->reply_client = moduleAllocTempClient(); | ||
| bc->thread_safe_ctx_client = moduleAllocTempClient(); |
There was a problem hiding this comment.
Avoid recycling thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN.
VM_GetThreadSafeContext() stores bc->thread_safe_ctx_client into ctx->client. Replacing that client here means any outstanding thread-safe contexts created before the first unblock now hold a dangling client pointer. If a worker thread keeps using the same context after returning VALKEYMODULE_REPLY_AGAIN, this turns into a use-after-free / cross-request state corruption bug.
As per coding guidelines, "Verify thread safety in threaded I/O paths in C code".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/module.c` around lines 9002 - 9005, The code releases and re-allocates
bc->thread_safe_ctx_client across VALKEYMODULE_REPLY_AGAIN which invalidates
ctx->client returned by VM_GetThreadSafeContext and can cause use-after-free;
fix by not recycling or replacing bc->thread_safe_ctx_client in this path —
remove or skip the moduleReleaseTempClient/moduleAllocTempClient calls for
bc->thread_safe_ctx_client here and only touch bc->reply_client (or allocate a
new thread-safe client per VM_GetThreadSafeContext and ensure the context
owns/releases it) so outstanding thread-safe contexts keep a stable client
pointer.
Restore the reply client's RESP mode after reblocking.
The initial block path copies bc->client->resp into bc->reply_client, but this branch allocates a fresh pooled client without doing the same. Any buffered reply emitted before the next unblock can then be serialized in the wrong RESP version.
Proposed fix
if (reply_ret == VALKEYMODULE_REPLY_AGAIN) {
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);
bc->reply_client = moduleAllocTempClient();
bc->thread_safe_ctx_client = moduleAllocTempClient();
+ if (bc->client) bc->reply_client->resp = bc->client->resp;
bc->unblocked = 0;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
continue;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| moduleReleaseTempClient(bc->reply_client); | |
| moduleReleaseTempClient(bc->thread_safe_ctx_client); | |
| bc->reply_client = moduleAllocTempClient(); | |
| bc->thread_safe_ctx_client = moduleAllocTempClient(); | |
| if (reply_ret == VALKEYMODULE_REPLY_AGAIN) { | |
| moduleReleaseTempClient(bc->reply_client); | |
| moduleReleaseTempClient(bc->thread_safe_ctx_client); | |
| bc->reply_client = moduleAllocTempClient(); | |
| bc->thread_safe_ctx_client = moduleAllocTempClient(); | |
| if (bc->client) bc->reply_client->resp = bc->client->resp; | |
| bc->unblocked = 0; | |
| pthread_mutex_lock(&moduleUnblockedClientsMutex); | |
| continue; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/module.c` around lines 9002 - 9005, When replacing the temporary clients
in the reblock path, copy the original client's RESP mode into the newly
allocated reply client so buffered replies use the correct RESP version: after
calling moduleReleaseTempClient and moduleAllocTempClient for bc->reply_client
(and similarly for bc->thread_safe_ctx_client if needed), set
bc->reply_client->resp = bc->client->resp (or use the appropriate accessor on
the new temp client) to restore the RESP mode that was set earlier in the
initial block path.
| test {REPLY_AGAIN disconnect fires disconnect callback} { | ||
| # Start reblock.disconnect on a separate client, then kill that client. | ||
| set rd [valkey_deferring_client] | ||
| $rd reblock.disconnect | ||
| after 100 | ||
| # Get the client id | ||
| set clients [r client list] | ||
| set target_id "" | ||
| foreach line [split $clients "\n"] { | ||
| if {[string match "*cmd=reblock.disconnect*" $line]} { | ||
| regexp {id=(\d+)} $line -> target_id | ||
| } | ||
| } | ||
| # Kill the client | ||
| if {$target_id ne ""} { | ||
| r client kill id $target_id | ||
| } | ||
| after 200 | ||
| # Verify disconnect callback was called | ||
| assert_equal 1 [r reblock.get_disconnect_called] | ||
| $rd close | ||
| } | ||
|
|
||
| test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { | ||
| # Block a client with reblock that always returns REPLY_AGAIN | ||
| # then use CLIENT UNBLOCK to abort it — should trigger timeout cb | ||
| set rd [valkey_deferring_client] | ||
| $rd reblock.timeout | ||
| after 100 | ||
| set clients [r client list] | ||
| set target_id "" | ||
| foreach line [split $clients "\n"] { | ||
| if {[string match "*cmd=reblock.timeout*" $line]} { | ||
| regexp {id=(\d+)} $line -> target_id | ||
| } | ||
| } | ||
| if {$target_id ne ""} { | ||
| r client unblock $target_id timeout | ||
| } | ||
| set result [$rd read] | ||
| assert_match "*Timed out*" $result | ||
| $rd close | ||
| } |
There was a problem hiding this comment.
Make the re-block integration tests deterministic and enforce the intended path.
These tests currently rely on fixed sleeps and optional client kill/unblock, which can make them flaky and can let the CLIENT UNBLOCK case pass via natural timeout instead of the unblock path.
💡 Proposed fix
@@
test {REPLY_AGAIN disconnect fires disconnect callback} {
# Start reblock.disconnect on a separate client, then kill that client.
set rd [valkey_deferring_client]
$rd reblock.disconnect
- after 100
- # Get the client id
- set clients [r client list]
set target_id ""
- foreach line [split $clients "\n"] {
- if {[string match "*cmd=reblock.disconnect*" $line]} {
- regexp {id=(\d+)} $line -> target_id
- }
- }
- # Kill the client
- if {$target_id ne ""} {
- r client kill id $target_id
- }
- after 200
- # Verify disconnect callback was called
- assert_equal 1 [r reblock.get_disconnect_called]
+ wait_for_condition 50 20 {
+ set clients [r client list]
+ set target_id ""
+ foreach line [split $clients "\n"] {
+ if {[string match "*cmd=reblock.disconnect*" $line]} {
+ regexp {id=(\d+)} $line -> target_id
+ break
+ }
+ }
+ $target_id ne ""
+ } else {
+ fail "Failed to find client running reblock.disconnect"
+ }
+
+ assert_equal 1 [r client kill id $target_id]
+ wait_for_condition 50 20 {
+ [r reblock.get_disconnect_called] eq 1
+ } else {
+ fail "Failed waiting for disconnect callback"
+ }
$rd close
}
@@
test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} {
# Block a client with reblock that always returns REPLY_AGAIN
# then use CLIENT UNBLOCK to abort it — should trigger timeout cb
set rd [valkey_deferring_client]
$rd reblock.timeout
- after 100
- set clients [r client list]
set target_id ""
- foreach line [split $clients "\n"] {
- if {[string match "*cmd=reblock.timeout*" $line]} {
- regexp {id=(\d+)} $line -> target_id
- }
- }
- if {$target_id ne ""} {
- r client unblock $target_id timeout
- }
+ wait_for_condition 50 20 {
+ set clients [r client list]
+ set target_id ""
+ foreach line [split $clients "\n"] {
+ if {[string match "*cmd=reblock.timeout*" $line]} {
+ regexp {id=(\d+)} $line -> target_id
+ break
+ }
+ }
+ $target_id ne ""
+ } else {
+ fail "Failed to find client running reblock.timeout"
+ }
+
+ assert_equal 1 [r client unblock $target_id timeout]
set result [$rd read]
assert_match "*Timed out*" $result
$rd close
}As per coding guidelines: "Avoid timing-dependent tests; use proper synchronization (test reliability)".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| test {REPLY_AGAIN disconnect fires disconnect callback} { | |
| # Start reblock.disconnect on a separate client, then kill that client. | |
| set rd [valkey_deferring_client] | |
| $rd reblock.disconnect | |
| after 100 | |
| # Get the client id | |
| set clients [r client list] | |
| set target_id "" | |
| foreach line [split $clients "\n"] { | |
| if {[string match "*cmd=reblock.disconnect*" $line]} { | |
| regexp {id=(\d+)} $line -> target_id | |
| } | |
| } | |
| # Kill the client | |
| if {$target_id ne ""} { | |
| r client kill id $target_id | |
| } | |
| after 200 | |
| # Verify disconnect callback was called | |
| assert_equal 1 [r reblock.get_disconnect_called] | |
| $rd close | |
| } | |
| test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { | |
| # Block a client with reblock that always returns REPLY_AGAIN | |
| # then use CLIENT UNBLOCK to abort it — should trigger timeout cb | |
| set rd [valkey_deferring_client] | |
| $rd reblock.timeout | |
| after 100 | |
| set clients [r client list] | |
| set target_id "" | |
| foreach line [split $clients "\n"] { | |
| if {[string match "*cmd=reblock.timeout*" $line]} { | |
| regexp {id=(\d+)} $line -> target_id | |
| } | |
| } | |
| if {$target_id ne ""} { | |
| r client unblock $target_id timeout | |
| } | |
| set result [$rd read] | |
| assert_match "*Timed out*" $result | |
| $rd close | |
| } | |
| test {REPLY_AGAIN disconnect fires disconnect callback} { | |
| # Start reblock.disconnect on a separate client, then kill that client. | |
| set rd [valkey_deferring_client] | |
| $rd reblock.disconnect | |
| set target_id "" | |
| wait_for_condition 50 20 { | |
| set clients [r client list] | |
| set target_id "" | |
| foreach line [split $clients "\n"] { | |
| if {[string match "*cmd=reblock.disconnect*" $line]} { | |
| regexp {id=(\d+)} $line -> target_id | |
| break | |
| } | |
| } | |
| $target_id ne "" | |
| } else { | |
| fail "Failed to find client running reblock.disconnect" | |
| } | |
| assert_equal 1 [r client kill id $target_id] | |
| wait_for_condition 50 20 { | |
| [r reblock.get_disconnect_called] eq 1 | |
| } else { | |
| fail "Failed waiting for disconnect callback" | |
| } | |
| $rd close | |
| } | |
| test {REPLY_AGAIN CLIENT UNBLOCK triggers timeout callback} { | |
| # Block a client with reblock that always returns REPLY_AGAIN | |
| # then use CLIENT UNBLOCK to abort it — should trigger timeout cb | |
| set rd [valkey_deferring_client] | |
| $rd reblock.timeout | |
| set target_id "" | |
| wait_for_condition 50 20 { | |
| set clients [r client list] | |
| set target_id "" | |
| foreach line [split $clients "\n"] { | |
| if {[string match "*cmd=reblock.timeout*" $line]} { | |
| regexp {id=(\d+)} $line -> target_id | |
| break | |
| } | |
| } | |
| $target_id ne "" | |
| } else { | |
| fail "Failed to find client running reblock.timeout" | |
| } | |
| assert_equal 1 [r client unblock $target_id timeout] | |
| set result [$rd read] | |
| assert_match "*Timed out*" $result | |
| $rd close | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/moduleapi/blockedclient.tcl` around lines 314 - 356, The tests
rely on sleeps and racey behavior; replace time-based waits with explicit
synchronization: after starting valkey_deferring_client and issuing
reblock.disconnect or reblock.timeout, poll r client list (the existing foreach
over split $clients) in a short loop with a timeout to wait until a line
matching "*cmd=reblock.disconnect*" or "*cmd=reblock.timeout*" yields a
non-empty target_id, then immediately perform r client kill id $target_id or r
client unblock $target_id timeout respectively; after sending unblock/kill, wait
for the specific deterministic signals using r reblock.get_disconnect_called or
the exact $rd read result (rather than fixed after N ms), and only then close
$rd—this enforces the intended unblock/kill path and removes flaky sleeps.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #3707 +/- ##
============================================
- Coverage 76.71% 76.51% -0.21%
============================================
Files 162 162
Lines 80662 80672 +10
============================================
- Hits 61881 61725 -156
- Misses 18781 18947 +166
🚀 New features to boost your workflow:
|
Please note this is a draft PR as I am working on perf benchmarking of Valkey Core + Valkey Search. Do not review yet. I will update the status of the PR once ready.
Today, a module that blocks a client cannot keep it blocked after the reply callback is invoked. The core always unblocks the client and frees the blocked client handle after the reply callback returns. This forces modules that need to do main-thread work after unblock (like valkey-search's content resolution) to use
EventLoopAddOneShotto schedule that work, adding an extra event loop iteration of latency per operation.The fix adds a new return value
VALKEYMODULE_REPLY_AGAIN(value 2). When the reply callback returns this, the core keeps the client blocked with its privdata intact. The module callsUnblockClientagain later to re-trigger the reply callback. This is not a breaking change — the reply callback already returnsintand existing modules return 0 or 1.I am leaning towards this over a separate
VM_ReblockClient()API (or new blocking framework) because the blocked client already has all necessary state (privdata, callbacks, timeout). No new state machine or lifecycle management needed — just a signal that says "I'm not done yet."Please note that I need to review the core module.c code and ensure the core change correctly works with the rest of the module blocking framework. here is what I did so far and tested:
bc->unblockedis reset to 0 so the timeout mechanism continues to workTested with valkey-search (10k docs,
FT.SEARCH "common" LIMIT 0 10). Both before and after include the valkeysearch PausePoint + DecrementRefCount fixes from PR 1037.The fix helps most when content resolution involves real work (multiple fields, larger data). With trivial content (1 small field), the event loop hop is negligible. With 5 fields × 1KB, the scheduling overhead becomes a significant fraction of per-query latency — especially at low concurrency where each query pays the full round-trip cost.
Prior to the fix, main thread is mostly idle between
EventLoopAddOneShotcallbacks — the scheduling hop is the bottleneck at low concurrency.After the fix, Main thread does content fetch + reply inline in the reply callback. No scheduling hop.
Tests added to
tests/modules/blockedclient.c:reblock.test— re-blocks twice, replies on third unblockreblock.timeout— re-blocks forever, verifies timeout callback firesreblock.disconnect— re-blocks, client killed, verifies disconnect callback firesCLIENT UNBLOCKon re-blocked client — verifies abort triggers timeout callback