Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
362 changes: 247 additions & 115 deletions src/cluster.c

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void clusterInitLast(void);
void clusterCron(void);
void clusterBeforeSleep(void);
int verifyClusterConfigWithData(void);
void clusterPrepareShutdown(void);
void clusterHandleServerShutdown(bool auto_failover);

int clusterSendModuleMessageToTarget(const char *target,
Expand Down Expand Up @@ -156,6 +157,7 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s);
void clusterSlotChange(slotRange *ranges, int numranges, clusterNode *target, void *ctx, void (*callback)(void *ctx, const char *error));
void clusterCancelManualFailover(void);
void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required);
void clusterHandleLostLastSlot(clusterNode *target);
void clusterScheduleHandleSlotMigration(void);
mstime_t clusterComputeMfPauseEnd(void);

Expand Down
3 changes: 2 additions & 1 deletion src/cluster_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ typedef struct clusterBusType {
void (*initLast)(void);
void (*cron)(void);
void (*beforeSleep)(void);
void (*handleServerShutdown)(bool auto_failover);
void (*prepareShutdown)(void); /* Called early in shutdown to allow graceful handoff. */
void (*handleServerShutdown)(void);

/* Cluster link message handling — called from cluster_link.c */

Expand Down
324 changes: 25 additions & 299 deletions src/cluster_legacy.c

Large diffs are not rendered by default.

25 changes: 21 additions & 4 deletions src/cluster_link.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,22 +409,39 @@ void clusterConnectNodes(void) {
mstime_t reconnect_interval = server.cluster_node_timeout / 2;
if (reconnect_interval <= 0) reconnect_interval = 1;

/* Budget: try to contact every node NODE_CONNECTION_RETRIES_PER_TIMEOUT
* times within node_timeout. Each cron tick gets a proportional share. */
long long budget = (long long)dictSize(server.cluster->nodes) *
CLUSTER_CRON_PERIOD_MS / reconnect_interval *
NODE_CONNECTION_RETRIES_PER_TIMEOUT;
if (budget < 1) budget = 1;

dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == myself) continue;
if (node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_NOADDR)) continue;

/* Free links that exceeded the send buffer limit. */
freeClusterLinkOnBufferLimitReached(node->link);
freeClusterLinkOnBufferLimitReached(node->inbound_link);

/* Remove handshake nodes that have timed out. */
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}

/* Throttle reconnection attempts. */
if (now - node->outbound_link_attempt_time < reconnect_interval) continue;
if (node->link) continue;

/* Throttle reconnection attempts. If an inbound link exists the peer
* already knows us, so reconnect immediately without throttling. */
if (!node->inbound_link) {
mstime_t backoff = reconnect_interval / NODE_CONNECTION_RETRIES_PER_TIMEOUT;
if (now - node->outbound_link_attempt_time < backoff && budget <= 0) continue;
}
node->outbound_link_attempt_time = now;
budget--;

clusterLink *link = createClusterLink(node);
link->conn = connCreate(connTypeOfCluster());
Expand Down
3 changes: 3 additions & 0 deletions src/cluster_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ typedef struct clusterNode clusterNode;
#define RCVBUF_MIN_READ_LEN 14
#define RCVBUF_MAX_PREALLOC (1 << 20) /* 1MB */

/* How many times per node_timeout we attempt to reconnect to each node. */
#define NODE_CONNECTION_RETRIES_PER_TIMEOUT 10

/* A refcounted block of bytes queued for sending on a cluster link.
* The data member is uint64_t to ensure alignment for protocol
* implementations that cast it to message structs with uint64_t fields. */
Expand Down
238 changes: 110 additions & 128 deletions src/cluster_nodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,103 @@ static sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pa
return ci;
}

/* Append the address+aux string for a node to an sds: ip:port@cport[,hostname][,aux=val]*
* This is the same format used in the second column of nodes.conf. */
sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary) {
int port = tls_primary ? node->tls_port : node->tcp_port;
s = sdscatfmt(s, "%s:%i@%i", node->ip, port, node->cport);
if (sdslen(node->hostname) != 0) {
s = sdscatfmt(s, ",%s", node->hostname);
} else {
s = sdscatlen(s, ",", 1);
}
for (int i = af_count - 1; i >= 0; i--) {
if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) continue;
if (auxFieldHandlers[i].isPresent(node)) {
s = sdscatfmt(s, ",%s=", auxFieldHandlers[i].field);
s = auxFieldHandlers[i].getter(node, s);
}
}
return s;
}

/* Parse an address+aux string onto a node. The string format is:
* ip:port@cport[,hostname][,aux=val]*
* Returns C_OK on success, C_ERR on parse error. The input string is modified. */
int clusterNodeParseAddressString(clusterNode *n, char *str) {
int aux_argc;
sds *aux_argv = sdssplitlen(str, strlen(str), ",", 1, &aux_argc);
if (aux_argv == NULL) return C_ERR;

/* Hostname */
if (aux_argc > 1 && sdslen(aux_argv[1]) > 0) {
n->hostname = sdscpy(n->hostname, aux_argv[1]);
} else if (sdslen(n->hostname) != 0) {
sdsclear(n->hostname);
}

/* Aux fields */
int aux_tcp_port = 0, aux_tls_port = 0;
for (int i = 2; i < aux_argc; i++) {
int field_argc;
sds *field_argv = sdssplitlen(aux_argv[i], sdslen(aux_argv[i]), "=", 1, &field_argc);
if (field_argv == NULL || field_argc != 2) {
if (field_argv) sdsfreesplitres(field_argv, field_argc);
goto err;
}
if (!isValidAuxString(field_argv[0], sdslen(field_argv[0])) ||
!isValidAuxString(field_argv[1], sdslen(field_argv[1]))) {
sdsfreesplitres(field_argv, field_argc);
goto err;
}
int found = 0;
for (unsigned j = 0; j < af_count; j++) {
if (sdslen(field_argv[0]) != strlen(auxFieldHandlers[j].field) ||
memcmp(field_argv[0], auxFieldHandlers[j].field, sdslen(field_argv[0])) != 0)
continue;
found = 1;
aux_tcp_port |= j == af_tcp_port;
aux_tls_port |= j == af_tls_port;
if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
sdsfreesplitres(field_argv, field_argc);
goto err;
}
}
sdsfreesplitres(field_argv, field_argc);
if (!found) goto err;
}

/* ip:port@cport */
char *p = strrchr(aux_argv[0], ':');
if (!p) goto err;
*p = '\0';
memcpy(n->ip, aux_argv[0], strlen(aux_argv[0]) + 1);
char *port = p + 1;
char *busp = strchr(port, '@');
if (busp) {
*busp = '\0';
busp++;
}
if (!aux_tcp_port && !aux_tls_port) {
if (server.tls_cluster)
n->tls_port = atoi(port);
else
n->tcp_port = atoi(port);
} else if (!aux_tcp_port) {
n->tcp_port = atoi(port);
} else if (!aux_tls_port) {
n->tls_port = atoi(port);
}
n->cport = busp ? atoi(busp) : (getNodeDefaultClientPort(n) + CLUSTER_PORT_INCR);

sdsfreesplitres(aux_argv, aux_argc);
return C_OK;

err:
sdsfreesplitres(aux_argv, aux_argc);
return C_ERR;
}

/* Generate a csv-alike representation of the specified cluster node.
* See clusterGenNodesDescription() top comment for more information.
*
Expand All @@ -337,29 +434,20 @@ static sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pa
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
int j, start;
sds ci;
int port = clusterNodeClientPort(node, tls_primary, c);
char *ip = clusterNodeIp(node, c);

/* Node coordinates */
ci = sdscatlen(sdsempty(), node->name, CLUSTER_NAMELEN);
ci = sdscatfmt(ci, " %s:%i@%i", ip, port, node->cport);
if (sdslen(node->hostname) != 0) {
ci = sdscatfmt(ci, ",%s", node->hostname);
}
/* Don't expose aux fields to any clients yet but do allow them
* to be persisted to nodes.conf */
ci = sdscatlen(ci, " ", 1);
if (c == NULL) {
if (sdslen(node->hostname) == 0) {
ci = sdscatfmt(ci, ",", 1);
}
for (int i = af_count - 1; i >= 0; i--) {
if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) {
continue;
}
if (auxFieldHandlers[i].isPresent(node)) {
ci = sdscatfmt(ci, ",%s=", auxFieldHandlers[i].field);
ci = auxFieldHandlers[i].getter(node, ci);
}
/* nodes.conf: use the canonical address+aux format */
ci = clusterNodeAppendAddressString(ci, node, tls_primary);
} else {
/* CLUSTER NODES reply: use client-facing IP/port, no aux fields */
int port = clusterNodeClientPort(node, tls_primary, c);
char *ip = clusterNodeIp(node, c);
ci = sdscatfmt(ci, "%s:%i@%i", ip, port, node->cport);
if (sdslen(node->hostname) != 0) {
ci = sdscatfmt(ci, ",%s", node->hostname);
}
}

Expand Down Expand Up @@ -553,8 +641,8 @@ int clusterLoadConfig(char *filename) {
line = zmalloc(maxline);
tmp_cluster_nodes = dictCreate(&clusterNodesDictType);
while (fgets(line, maxline, fp) != NULL) {
int argc, aux_argc;
sds *argv, *aux_argv;
int argc;
sds *argv;
clusterNode *n, *primary;
char *p, *s;

Expand Down Expand Up @@ -611,117 +699,11 @@ int clusterLoadConfig(char *filename) {
}
/* Format for the node address and auxiliary argument information:
* ip:port[@cport][,hostname][,aux=val]*] */

aux_argv = sdssplitlen(argv[1], sdslen(argv[1]), ",", 1, &aux_argc);
if (aux_argv == NULL) {
if (clusterNodeParseAddressString(n, argv[1]) == C_ERR) {
sdsfreesplitres(argv, argc);
goto fmterr;
}

/* Hostname is an optional argument that defines the endpoint
* that can be reported to clients instead of IP. */
if (aux_argc > 1 && sdslen(aux_argv[1]) > 0) {
n->hostname = sdscpy(n->hostname, aux_argv[1]);
} else if (sdslen(n->hostname) != 0) {
sdsclear(n->hostname);
}

/* All fields after hostname are auxiliary and they take on
* the format of "aux=val" where both aux and val can contain
* characters that pass the isValidAuxChar check only. The order
* of the aux fields is insignificant. */
int aux_tcp_port = 0;
int aux_tls_port = 0;
for (int i = 2; i < aux_argc; i++) {
int field_argc;
sds *field_argv;
field_argv = sdssplitlen(aux_argv[i], sdslen(aux_argv[i]), "=", 1, &field_argc);
if (field_argv == NULL || field_argc != 2) {
/* Invalid aux field format */
if (field_argv != NULL) sdsfreesplitres(field_argv, field_argc);
sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv, argc);
goto fmterr;
}

/* Validate that both aux and value contain valid characters only */
for (unsigned j = 0; j < 2; j++) {
if (!isValidAuxString(field_argv[j], sdslen(field_argv[j]))) {
/* Invalid aux field format */
sdsfreesplitres(field_argv, field_argc);
sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv, argc);
goto fmterr;
}
}

/* Note that we don't expect lots of aux fields in the foreseeable
* future so a linear search is completely fine. */
int field_found = 0;
for (unsigned j = 0; j < af_count; j++) {
if (sdslen(field_argv[0]) != strlen(auxFieldHandlers[j].field) ||
memcmp(field_argv[0], auxFieldHandlers[j].field, sdslen(field_argv[0])) != 0) {
continue;
}
field_found = 1;
aux_tcp_port |= j == af_tcp_port;
aux_tls_port |= j == af_tls_port;
if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
/* Invalid aux field format */
sdsfreesplitres(field_argv, field_argc);
sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv, argc);
goto fmterr;
}
}

if (field_found == 0) {
/* Invalid aux field format */
sdsfreesplitres(field_argv, field_argc);
sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv, argc);
goto fmterr;
}

sdsfreesplitres(field_argv, field_argc);
}
/* Address and port */
if ((p = strrchr(aux_argv[0], ':')) == NULL) {
sdsfreesplitres(aux_argv, aux_argc);
sdsfreesplitres(argv, argc);
goto fmterr;
}
*p = '\0';
memcpy(n->ip, aux_argv[0], strlen(aux_argv[0]) + 1);
char *port = p + 1;
char *busp = strchr(port, '@');
if (busp) {
*busp = '\0';
busp++;
}
/* If neither TCP or TLS port is found in aux field, it is considered
* an old version of nodes.conf file.*/
if (!aux_tcp_port && !aux_tls_port) {
if (server.tls_cluster) {
n->tls_port = atoi(port);
} else {
n->tcp_port = atoi(port);
}
} else if (!aux_tcp_port) {
n->tcp_port = atoi(port);
} else if (!aux_tls_port) {
n->tls_port = atoi(port);
}
/* In older versions of nodes.conf the "@busport" part is missing.
* In this case we set it to the default offset of 10000 from the
* base port. */
n->cport = busp ? atoi(busp) : (getNodeDefaultClientPort(n) + CLUSTER_PORT_INCR);

/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */

sdsfreesplitres(aux_argv, aux_argc);

/* Parse flags */
p = s = argv[2];
while (p) {
Expand Down
4 changes: 4 additions & 0 deletions src/cluster_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

#include "cluster.h"

/* Node address string: ip:port@cport[,hostname][,aux=val]* */
sds clusterNodeAppendAddressString(sds s, clusterNode *node, int tls_primary);
int clusterNodeParseAddressString(clusterNode *n, char *str);

/* Node description / serialization. */
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
Expand Down
1 change: 1 addition & 0 deletions src/cluster_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct clusterNode {
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
int is_node_healthy; /* Boolean indicating the cached node health.
Update with updateAndCountChangedNodeHealth(). */
mstime_t orphaned_time; /* Starting time of orphaned primary condition */
void *protocol_data; /* Protocol-specific data (e.g. clusterNodeLegacyData) */
};

Expand Down
3 changes: 3 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -4763,6 +4763,9 @@ int prepareForShutdown(client *c, int flags) {
}
if (server.supervised_mode == SUPERVISED_SYSTEMD) serverCommunicateSystemd("STOPPING=1\n");

/* Allow the cluster protocol to initiate graceful handoff (e.g. leader transfer). */
if (server.cluster_enabled) clusterPrepareShutdown();

/* If we have any replicas, let them catch up the replication offset before
* we shut down, to avoid data loss. */
if (!(flags & SHUTDOWN_NOW) && server.shutdown_timeout != 0 && !isReadyToShutdown()) {
Expand Down
Loading