Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,9 @@ void debugCommand(client *c) {
} else if (!strcasecmp(objectGetVal(c->argv[1]), "client-enforce-reply-list") && c->argc == 3) {
server.debug_client_enforce_reply_list = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "force-free-primary-async") && c->argc == 3) {
server.debug_force_free_primary_async = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
8 changes: 8 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,14 @@ int freeClient(client *c) {
return 0;
}

/* Debug: force async free for the primary client to deterministically
* reproduce the deferred-free replication state clobber race. */
if (server.debug_force_free_primary_async && c->flag.primary) {
server.debug_force_free_primary_async = 0;
freeClientAsync(c);
return 0;
}

/* For connected clients, call the disconnection event of modules hooks. */
if (c->conn) {
moduleFireServerEvent(VALKEYMODULE_EVENT_CLIENT_CHANGE, VALKEYMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED, c);
Expand Down
22 changes: 19 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4560,15 +4560,31 @@ void replicationHandlePrimaryDisconnection(void) {
moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL);

server.primary = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;

/* freeClient(primary) can be deferred via freeClientAsync when the client
* has pending IO. By the time we run in that deferred context,
* replicationUnsetPrimary()/replicationSetPrimary() may have already
* finalized replication state. Only transition to REPL_STATE_CONNECT if
* we were genuinely connected (REPL_STATE_CONNECTED) and primary_host is
* still set. Otherwise this is a stale deferred free and we must not
* clobber the current state. */
if (server.repl_state == REPL_STATE_CONNECTED && server.primary_host) {
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
} else if (server.repl_state == REPL_STATE_CONNECTED) {
/* primary_host is NULL: deliberate unset in progress. */
server.repl_state = REPL_STATE_NONE;
Comment thread
ranshid marked this conversation as resolved.
}
/* Any other repl_state means the state machine already moved on
* (e.g. REPL_STATE_CONNECT, CONNECTING, NONE) — leave it untouched. */

/* We lost connection with our primary, don't disconnect replicas yet,
* maybe we'll be able to PSYNC with our primary later. We'll disconnect
* the replicas only if we'll have to do a full resync with our primary. */

/* Try to re-connect immediately rather than wait for replicationCron
* waiting 1 second may risk backlog being recycled. */
if (server.primary_host) {
if (server.repl_state == REPL_STATE_CONNECT && server.primary_host) {
serverLog(LL_NOTICE, "Reconnecting to PRIMARY %s:%d", server.primary_host, server.primary_port);
connectWithPrimary();
}
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2967,6 +2967,7 @@ void initServer(void) {
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
server.debug_client_enforce_reply_list = 0;
server.debug_force_free_primary_async = 0;
resetReplicationBuffer();

/* Make sure the locale is set on startup based on the config file. */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,7 @@ struct valkeyServer {
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_assert; /* Enable debug asserts */
int debug_client_enforce_reply_list; /* Force client to always use the reply list */
int debug_force_free_primary_async; /* Force freeClient on primary to use async path */
/* Reply construction copy avoidance */
int min_io_threads_copy_avoid; /* Minimum number of IO threads for copy avoidance in reply construction */
int min_string_size_copy_avoid_threaded; /* Minimum bulk string size for copy avoidance in reply construction when IO threads enabled */
Expand Down
1 change: 1 addition & 0 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ static int connTLSConnect(connection *conn_,
unsigned char addr_buf[sizeof(struct in6_addr)];

if (conn->c.state != CONN_STATE_NONE) return C_ERR;
if (addr == NULL) return C_ERR;
ERR_clear_error();

/* Check whether addr is an IP address, if not, use the value for Server Name Indication */
Expand Down
48 changes: 47 additions & 1 deletion tests/unit/wait.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ tags {"wait aof network external:skip"} {
set rd [valkey_deferring_client -1]
$rd incr foo
$rd read
$rd waitaof 0 1 0
$rd waitaof 0 1 10000
wait_for_blocked_client -1
$replica replicaof $master_host $master_port
assert_equal [$rd read] {1 1}
Expand Down Expand Up @@ -532,3 +532,49 @@ start_server {} {
}
}
}

start_server {tags {"wait network external:skip"}} {
Comment thread
ranshid marked this conversation as resolved.
start_server {} {
start_server {} {
set master1 [srv -2 client]
set master1_host [srv -2 host]
set master1_port [srv -2 port]

set master2 [srv -1 client]
set master2_host [srv -1 host]
set master2_port [srv -1 port]

set replica [srv 0 client]

test {Repoint replica with deferred freeClient does not double-connect} {
$replica replicaof $master1_host $master1_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication to master1 not established"
}

$replica debug force-free-primary-async 1

set log_lines [count_log_lines 0]
$replica replicaof $master2_host $master2_port
wait_for_condition 50 200 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication to master2 not established after repoint"
}
after 2000

set logfile [srv 0 stdout]
set lines [split [exec tail -n +[expr {$log_lines + 1}] < $logfile] "\n"]
set sync_count 0
foreach line $lines {
if {[string match "*Connecting to PRIMARY $master2_host:$master2_port*" $line]} {
incr sync_count
}
}
assert_equal 1 $sync_count
}
}
}
}
Loading