Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,9 @@ void debugCommand(client *c) {
} else if (!strcasecmp(objectGetVal(c->argv[1]), "client-enforce-reply-list") && c->argc == 3) {
server.debug_client_enforce_reply_list = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "force-free-primary-async") && c->argc == 3) {
server.debug_force_free_primary_async = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
} else if (!handleDebugClusterCommand(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down
8 changes: 8 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,14 @@ int freeClient(client *c) {
return 0;
}

/* Debug: force async free for the primary client to deterministically
* reproduce the deferred-free replication state clobber race. */
if (server.debug_force_free_primary_async && c->flag.primary) {
server.debug_force_free_primary_async = 0;
freeClientAsync(c);
return 0;
}

/* For connected clients, call the disconnection event of modules hooks. */
if (c->conn) {
moduleFireServerEvent(VALKEYMODULE_EVENT_CLIENT_CHANGE, VALKEYMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED, c);
Expand Down
23 changes: 20 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -4560,15 +4560,32 @@ void replicationHandlePrimaryDisconnection(void) {
moduleFireServerEvent(VALKEYMODULE_EVENT_PRIMARY_LINK_CHANGE, VALKEYMODULE_SUBEVENT_PRIMARY_LINK_DOWN, NULL);

server.primary = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;

/* freeClient(primary) can be deferred via freeClientAsync when the client
* has pending IO. By the time we run in that deferred context,
* replicationUnsetPrimary()/replicationSetPrimary() may have already
* finalized replication state. Only transition to REPL_STATE_CONNECT if
* we were genuinely connected (REPL_STATE_CONNECTED) and primary_host is
* still set. Otherwise this is a stale deferred free and we must not
* clobber the current state. */
if (server.repl_state == REPL_STATE_CONNECTED && server.primary_host) {
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
} else if (server.repl_state == REPL_STATE_CONNECTED) {
/* primary_host is NULL: deliberate unset in progress. */
server.repl_state = REPL_STATE_NONE;
Comment thread
ranshid marked this conversation as resolved.
server.repl_down_since = server.unixtime;
}
/* Any other repl_state means the state machine already moved on
* (e.g. REPL_STATE_CONNECT, CONNECTING, NONE) — leave it untouched. */

/* We lost connection with our primary, don't disconnect replicas yet,
* maybe we'll be able to PSYNC with our primary later. We'll disconnect
* the replicas only if we'll have to do a full resync with our primary. */

/* Try to re-connect immediately rather than wait for replicationCron
* waiting 1 second may risk backlog being recycled. */
if (server.primary_host) {
if (server.repl_state == REPL_STATE_CONNECT && server.primary_host) {
serverLog(LL_NOTICE, "Reconnecting to PRIMARY %s:%d", server.primary_host, server.primary_port);
connectWithPrimary();
}
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2967,6 +2967,7 @@ void initServer(void) {
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
server.debug_client_enforce_reply_list = 0;
server.debug_force_free_primary_async = 0;
resetReplicationBuffer();

/* Make sure the locale is set on startup based on the config file. */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,7 @@ struct valkeyServer {
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_assert; /* Enable debug asserts */
int debug_client_enforce_reply_list; /* Force client to always use the reply list */
int debug_force_free_primary_async; /* Force freeClient on primary to use async path */
/* Reply construction copy avoidance */
int min_io_threads_copy_avoid; /* Minimum number of IO threads for copy avoidance in reply construction */
int min_string_size_copy_avoid_threaded; /* Minimum bulk string size for copy avoidance in reply construction when IO threads enabled */
Expand Down
1 change: 1 addition & 0 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ static int connTLSConnect(connection *conn_,
unsigned char addr_buf[sizeof(struct in6_addr)];

if (conn->c.state != CONN_STATE_NONE) return C_ERR;
if (addr == NULL) return C_ERR;
ERR_clear_error();

/* Check whether addr is an IP address, if not, use the value for Server Name Indication */
Expand Down
48 changes: 47 additions & 1 deletion tests/unit/wait.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ tags {"wait aof network external:skip"} {
set rd [valkey_deferring_client -1]
$rd incr foo
$rd read
$rd waitaof 0 1 0
$rd waitaof 0 1 10000
wait_for_blocked_client -1
$replica replicaof $master_host $master_port
assert_equal [$rd read] {1 1}
Expand Down Expand Up @@ -532,3 +532,49 @@ start_server {} {
}
}
}

start_server {tags {"wait network external:skip"}} {
Comment thread
ranshid marked this conversation as resolved.
start_server {} {
start_server {} {
set master1 [srv -2 client]
set master1_host [srv -2 host]
set master1_port [srv -2 port]

set master2 [srv -1 client]
set master2_host [srv -1 host]
set master2_port [srv -1 port]

set replica [srv 0 client]

test {Repoint replica with deferred freeClient does not double-connect} {
$replica replicaof $master1_host $master1_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication to master1 not established"
}

$replica debug force-free-primary-async 1

set log_lines [count_log_lines 0]
$replica replicaof $master2_host $master2_port
wait_for_condition 50 200 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication to master2 not established after repoint"
}
after 2000

set logfile [srv 0 stdout]
set lines [split [exec tail -n +[expr {$log_lines + 1}] < $logfile] "\n"]
set sync_count 0
foreach line $lines {
if {[string match "*Connecting to PRIMARY $master2_host:$master2_port*" $line]} {
incr sync_count
}
}
assert_equal 1 $sync_count
}
}
}
}
Loading