Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2328,11 +2328,11 @@ int rewriteModuleObject(rio *r, robj *key, robj *o, int dbid) {
}

static int rewriteFunctions(rio *aof) {
dict *functions = functionsLibGet();
dictIterator *iter = dictGetIterator(functions);
hashtable *functions = functionsLibGet();
hashtableIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionLibInfo *li = dictGetVal(entry);
functionLibInfo *li = entry->v.val;
if (rioWrite(aof, "*3\r\n", 4) == 0) goto werr;
char function_load[] = "$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n";
if (rioWrite(aof, function_load, sizeof(function_load) - 1) == 0) goto werr;
Expand Down
20 changes: 10 additions & 10 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -457,23 +457,23 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
/* In case key[j] did not have blocking clients yet, we need to create a new list */
if (db_blocked_entry != NULL) {
l = listCreate();
dictSetVal(c->db->blocking_keys, db_blocked_entry, l);
db_blocked_entry->v.val = l;
incrRefCount(keys[j]);
} else {
l = dictGetVal(db_blocked_existing_entry);
l = db_blocked_existing_entry->v.val;
}
listAddNodeTail(l, c);
dictSetVal(c->bstate->keys, client_blocked_entry, listLast(l));
client_blocked_entry->v.val = listLast(l);

/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {
db_blocked_entry = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], &db_blocked_existing_entry);
if (db_blocked_entry) {
incrRefCount(keys[j]);
dictSetUnsignedIntegerVal(db_blocked_entry, 1);
db_blocked_entry->v.u64 = 1;
} else {
dictIncrUnsignedIntegerVal(db_blocked_existing_entry, 1);
db_blocked_existing_entry->v.u64 += 1;
}
}
}
Expand All @@ -489,7 +489,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
* Internal function for unblockClient() */
static void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;
hashtableIterator *di;

if (dictSize(c->bstate->keys) == 0) return;

Expand Down Expand Up @@ -582,8 +582,8 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
void *key;
dictEntry *unblock_on_nokey_entry;

key = dictGetKey(de);
pos = dictGetVal(de);
key = de->key;
pos = de->v.val;
/* Remove this client from the list of clients waiting for this key. */
l = dictFetchValue(c->db->blocking_keys, key);
serverAssertWithInfo(c, key, l != NULL);
Expand All @@ -602,7 +602,7 @@ static void releaseBlockedEntry(client *c, dictEntry *de, int remove_key) {
unblock_on_nokey_entry = dictFind(c->db->blocking_keys_unblock_on_nokey, key);
/* it is not possible to have a client blocked on nokey with no matching entry */
serverAssertWithInfo(c, key, unblock_on_nokey_entry != NULL);
if (!dictIncrUnsignedIntegerVal(unblock_on_nokey_entry, -1)) {
if (!--unblock_on_nokey_entry->v.u64) {
/* in case the count is zero, we can delete the entry */
dictDelete(c->db->blocking_keys_unblock_on_nokey, key);
}
Expand All @@ -627,7 +627,7 @@ static void handleClientsBlockedOnKey(readyList *rl) {
dictEntry *de = dictFind(rl->db->blocking_keys, rl->key);

if (de) {
list *clients = dictGetVal(de);
list *clients = de->v.val;
listNode *ln;
listIter li;
listRewind(clients, &li);
Expand Down
14 changes: 7 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ migrateCachedSocket *migrateGetSocket(client *c, robj *host, robj *port, long ti
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
cs = de->v.val;
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets, dictGetKey(de));
dictDelete(server.migrate_cached_sockets, de->key);
}

/* Create the connection */
Expand Down Expand Up @@ -405,16 +405,16 @@ void migrateCloseSocket(robj *host, robj *port) {
}

void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
hashtableIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;

while ((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);
migrateCachedSocket *cs = de->v.val;

if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets, dictGetKey(de));
dictDelete(server.migrate_cached_sockets, de->key);
}
}
dictReleaseIterator(di);
Expand Down Expand Up @@ -1353,7 +1353,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
if (c->flag.blocked && (c->bstate->btype == BLOCKED_LIST || c->bstate->btype == BLOCKED_ZSET ||
c->bstate->btype == BLOCKED_STREAM || c->bstate->btype == BLOCKED_MODULE)) {
dictEntry *de;
dictIterator *di;
hashtableIterator *di;

/* If the client is blocked on module, but not on a specific key,
* don't unblock it. */
Expand All @@ -1371,7 +1371,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bstate->keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
robj *key = de->key;
int slot = keyHashSlot((char *)objectGetVal(key), sdslen(objectGetVal(key)));
serverAssert(slot == c->slot);
clusterNode *node = getNodeBySlot(slot);
Expand Down
Loading
Loading