|
|
|
|
@@ -621,7 +621,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
|
|
|
serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
|
|
|
|
|
/* Create a link object we use to handle the connection.
|
|
|
|
|
* It gets passed to the readable handler when data is available.
|
|
|
|
|
* Initiallly the link->node pointer is set to NULL as we don't know
|
|
|
|
|
* Initially the link->node pointer is set to NULL as we don't know
|
|
|
|
|
* which node is, but the right node is references once we know the
|
|
|
|
|
* node identity. */
|
|
|
|
|
link = createClusterLink(NULL);
|
|
|
|
|
@@ -872,7 +872,7 @@ int clusterAddNode(clusterNode *node) {
|
|
|
|
|
return (retval == DICT_OK) ? C_OK : C_ERR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Remove a node from the cluster. The functio performs the high level
|
|
|
|
|
/* Remove a node from the cluster. The function performs the high level
|
|
|
|
|
* cleanup, calling freeClusterNode() for the low level cleanup.
|
|
|
|
|
* Here we do the following:
|
|
|
|
|
*
|
|
|
|
|
@@ -969,7 +969,7 @@ uint64_t clusterGetMaxEpoch(void) {
|
|
|
|
|
* 3) Persist the configuration on disk before sending packets with the
|
|
|
|
|
* new configuration.
|
|
|
|
|
*
|
|
|
|
|
* If the new config epoch is generated and assigend, C_OK is returned,
|
|
|
|
|
* If the new config epoch is generated and assigned, C_OK is returned,
|
|
|
|
|
* otherwise C_ERR is returned (since the node has already the greatest
|
|
|
|
|
* configuration around) and no operation is performed.
|
|
|
|
|
*
|
|
|
|
|
@@ -1042,7 +1042,7 @@ int clusterBumpConfigEpochWithoutConsensus(void) {
|
|
|
|
|
*
|
|
|
|
|
* In general we want a system that eventually always ends with different
|
|
|
|
|
* masters having different configuration epochs whatever happened, since
|
|
|
|
|
* nothign is worse than a split-brain condition in a distributed system.
|
|
|
|
|
* nothing is worse than a split-brain condition in a distributed system.
|
|
|
|
|
*
|
|
|
|
|
* BEHAVIOR
|
|
|
|
|
*
|
|
|
|
|
@@ -1101,7 +1101,7 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) {
|
|
|
|
|
* entries from the black list. This is an O(N) operation but it is not a
|
|
|
|
|
* problem since add / exists operations are called very infrequently and
|
|
|
|
|
* the hash table is supposed to contain very little elements at max.
|
|
|
|
|
* However without the cleanup during long uptimes and with some automated
|
|
|
|
|
* However without the cleanup during long uptime and with some automated
|
|
|
|
|
* node add/removal procedures, entries could accumulate. */
|
|
|
|
|
void clusterBlacklistCleanup(void) {
|
|
|
|
|
dictIterator *di;
|
|
|
|
|
@@ -1669,7 +1669,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|
|
|
|
/* Check if the sender is a known node. */
|
|
|
|
|
sender = clusterLookupNode(hdr->sender);
|
|
|
|
|
if (sender && !nodeInHandshake(sender)) {
|
|
|
|
|
/* Update our curretEpoch if we see a newer epoch in the cluster. */
|
|
|
|
|
/* Update our currentEpoch if we see a newer epoch in the cluster. */
|
|
|
|
|
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
|
|
|
|
|
senderConfigEpoch = ntohu64(hdr->configEpoch);
|
|
|
|
|
if (senderCurrentEpoch > server.cluster->currentEpoch)
|
|
|
|
|
@@ -2294,7 +2294,7 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Send a PING or PONG packet to the specified node, making sure to add enough
|
|
|
|
|
* gossip informations. */
|
|
|
|
|
* gossip information. */
|
|
|
|
|
void clusterSendPing(clusterLink *link, int type) {
|
|
|
|
|
unsigned char *buf;
|
|
|
|
|
clusterMsg *hdr;
|
|
|
|
|
@@ -2314,7 +2314,7 @@ void clusterSendPing(clusterLink *link, int type) {
|
|
|
|
|
* node_timeout we exchange with each other node at least 4 packets
|
|
|
|
|
* (we ping in the worst case in node_timeout/2 time, and we also
|
|
|
|
|
* receive two pings from the host), we have a total of 8 packets
|
|
|
|
|
* in the node_timeout*2 falure reports validity time. So we have
|
|
|
|
|
* in the node_timeout*2 failure reports validity time. So we have
|
|
|
|
|
* that, for a single PFAIL node, we can expect to receive the following
|
|
|
|
|
* number of failure reports (in the specified window of time):
|
|
|
|
|
*
|
|
|
|
|
@@ -2331,7 +2331,7 @@ void clusterSendPing(clusterLink *link, int type) {
|
|
|
|
|
* same time.
|
|
|
|
|
*
|
|
|
|
|
* Since we have non-voting slaves that lower the probability of an entry
|
|
|
|
|
* to feature our node, we set the number of entires per packet as
|
|
|
|
|
* to feature our node, we set the number of entries per packet as
|
|
|
|
|
* 10% of the total nodes we have. */
|
|
|
|
|
wanted = floor(dictSize(server.cluster->nodes)/10);
|
|
|
|
|
if (wanted < 3) wanted = 3;
|
|
|
|
|
@@ -2378,7 +2378,7 @@ void clusterSendPing(clusterLink *link, int type) {
|
|
|
|
|
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
|
|
|
|
|
(this->link == NULL && this->numslots == 0))
|
|
|
|
|
{
|
|
|
|
|
freshnodes--; /* Tecnically not correct, but saves CPU. */
|
|
|
|
|
freshnodes--; /* Technically not correct, but saves CPU. */
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2901,7 +2901,7 @@ void clusterHandleSlaveFailover(void) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* If the previous failover attempt timedout and the retry time has
|
|
|
|
|
/* If the previous failover attempt timeout and the retry time has
|
|
|
|
|
* elapsed, we can setup a new one. */
|
|
|
|
|
if (auth_age > auth_retry_time) {
|
|
|
|
|
server.cluster->failover_auth_time = mstime() +
|
|
|
|
|
@@ -2994,7 +2994,7 @@ void clusterHandleSlaveFailover(void) {
|
|
|
|
|
(unsigned long long) myself->configEpoch);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Take responsability for the cluster slots. */
|
|
|
|
|
/* Take responsibility for the cluster slots. */
|
|
|
|
|
clusterFailoverReplaceYourMaster();
|
|
|
|
|
} else {
|
|
|
|
|
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
|
|
|
|
|
@@ -3006,7 +3006,7 @@ void clusterHandleSlaveFailover(void) {
|
|
|
|
|
*
|
|
|
|
|
* Slave migration is the process that allows a slave of a master that is
|
|
|
|
|
* already covered by at least another slave, to "migrate" to a master that
|
|
|
|
|
* is orpaned, that is, left with no working slaves.
|
|
|
|
|
* is orphaned, that is, left with no working slaves.
|
|
|
|
|
* ------------------------------------------------------------------------- */
|
|
|
|
|
|
|
|
|
|
/* This function is responsible to decide if this replica should be migrated
|
|
|
|
|
@@ -3023,7 +3023,7 @@ void clusterHandleSlaveFailover(void) {
|
|
|
|
|
* the nodes anyway, so we spend time into clusterHandleSlaveMigration()
|
|
|
|
|
* if definitely needed.
|
|
|
|
|
*
|
|
|
|
|
* The fuction is called with a pre-computed max_slaves, that is the max
|
|
|
|
|
* The function is called with a pre-computed max_slaves, that is the max
|
|
|
|
|
* number of working (not in FAIL state) slaves for a single master.
|
|
|
|
|
*
|
|
|
|
|
* Additional conditions for migration are examined inside the function.
|
|
|
|
|
@@ -3045,7 +3045,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
|
|
|
|
!nodeTimedOut(mymaster->slaves[j])) okslaves++;
|
|
|
|
|
if (okslaves <= server.cluster_migration_barrier) return;
|
|
|
|
|
|
|
|
|
|
/* Step 3: Idenitfy a candidate for migration, and check if among the
|
|
|
|
|
/* Step 3: Identify a candidate for migration, and check if among the
|
|
|
|
|
* masters with the greatest number of ok slaves, I'm the one with the
|
|
|
|
|
* smallest node ID (the "candidate slave").
|
|
|
|
|
*
|
|
|
|
|
@@ -3141,7 +3141,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
|
|
|
|
* data loss due to the asynchronous master-slave replication.
|
|
|
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
|
|
|
|
|
|
/* Reset the manual failover state. This works for both masters and slavesa
|
|
|
|
|
/* Reset the manual failover state. This works for both masters and slaves
|
|
|
|
|
* as all the state about manual failover is cleared.
|
|
|
|
|
*
|
|
|
|
|
* The function can be used both to initialize the manual failover state at
|
|
|
|
|
@@ -3549,12 +3549,12 @@ int clusterNodeSetSlotBit(clusterNode *n, int slot) {
|
|
|
|
|
* target for replicas migration, if and only if at least one of
|
|
|
|
|
* the other masters has slaves right now.
|
|
|
|
|
*
|
|
|
|
|
* Normally masters are valid targerts of replica migration if:
|
|
|
|
|
* Normally masters are valid targets of replica migration if:
|
|
|
|
|
* 1. The used to have slaves (but no longer have).
|
|
|
|
|
* 2. They are slaves failing over a master that used to have slaves.
|
|
|
|
|
*
|
|
|
|
|
* However new masters with slots assigned are considered valid
|
|
|
|
|
* migration tagets if the rest of the cluster is not a slave-less.
|
|
|
|
|
* migration targets if the rest of the cluster is not a slave-less.
|
|
|
|
|
*
|
|
|
|
|
* See https://github.com/antirez/redis/issues/3043 for more info. */
|
|
|
|
|
if (n->numslots == 1 && clusterMastersHaveSlaves())
|
|
|
|
|
@@ -3627,7 +3627,7 @@ void clusterCloseAllSlots(void) {
|
|
|
|
|
* -------------------------------------------------------------------------- */
|
|
|
|
|
|
|
|
|
|
/* The following are defines that are only used in the evaluation function
|
|
|
|
|
* and are based on heuristics. Actaully the main point about the rejoin and
|
|
|
|
|
* and are based on heuristics. Actually the main point about the rejoin and
|
|
|
|
|
* writable delay is that they should be a few orders of magnitude larger
|
|
|
|
|
* than the network latency. */
|
|
|
|
|
#define CLUSTER_MAX_REJOIN_DELAY 5000
|
|
|
|
|
@@ -3738,7 +3738,7 @@ void clusterUpdateState(void) {
|
|
|
|
|
* A) If no other node is in charge according to the current cluster
|
|
|
|
|
* configuration, we add these slots to our node.
|
|
|
|
|
* B) If according to our config other nodes are already in charge for
|
|
|
|
|
* this lots, we set the slots as IMPORTING from our point of view
|
|
|
|
|
* this slots, we set the slots as IMPORTING from our point of view
|
|
|
|
|
* in order to justify we have those slots, and in order to make
|
|
|
|
|
* redis-trib aware of the issue, so that it can try to fix it.
|
|
|
|
|
* 2) If we find data in a DB different than DB0 we return C_ERR to
|
|
|
|
|
@@ -4076,7 +4076,7 @@ void clusterCommand(client *c) {
|
|
|
|
|
"forget <node-id> -- Remove a node from the cluster.",
|
|
|
|
|
"getkeysinslot <slot> <count> -- Return key names stored by current node in a slot.",
|
|
|
|
|
"flushslots -- Delete current node own slots information.",
|
|
|
|
|
"info - Return onformation about the cluster.",
|
|
|
|
|
"info - Return information about the cluster.",
|
|
|
|
|
"keyslot <key> -- Return the hash slot for <key>.",
|
|
|
|
|
"meet <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
|
|
|
|
|
"myid -- Return the node id.",
|
|
|
|
|
@@ -4256,7 +4256,7 @@ NULL
|
|
|
|
|
}
|
|
|
|
|
/* If this slot is in migrating status but we have no keys
|
|
|
|
|
* for it assigning the slot to another node will clear
|
|
|
|
|
* the migratig status. */
|
|
|
|
|
* the migrating status. */
|
|
|
|
|
if (countKeysInSlot(slot) == 0 &&
|
|
|
|
|
server.cluster->migrating_slots_to[slot])
|
|
|
|
|
server.cluster->migrating_slots_to[slot] = NULL;
|
|
|
|
|
@@ -4880,7 +4880,7 @@ void migrateCloseSocket(robj *host, robj *port) {
|
|
|
|
|
sdsfree(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void migrateCloseTimedoutSockets(void) {
|
|
|
|
|
void migrateCloseTimeoutSockets(void) {
|
|
|
|
|
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
|
|
|
|
|
dictEntry *de;
|
|
|
|
|
|
|
|
|
|
@@ -5244,7 +5244,7 @@ void readwriteCommand(client *c) {
|
|
|
|
|
* resharding in progress).
|
|
|
|
|
*
|
|
|
|
|
* On success the function returns the node that is able to serve the request.
|
|
|
|
|
* If the node is not 'myself' a redirection must be perfomed. The kind of
|
|
|
|
|
* If the node is not 'myself' a redirection must be performed. The kind of
|
|
|
|
|
* redirection is specified setting the integer passed by reference
|
|
|
|
|
* 'error_code', which will be set to CLUSTER_REDIR_ASK or
|
|
|
|
|
* CLUSTER_REDIR_MOVED.
|
|
|
|
|
@@ -5363,7 +5363,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Migarting / Improrting slot? Count keys we don't have. */
|
|
|
|
|
/* Migrating / Importing slot? Count keys we don't have. */
|
|
|
|
|
if ((migrating_slot || importing_slot) &&
|
|
|
|
|
lookupKeyRead(&server.db[0],thiskey) == NULL)
|
|
|
|
|
{
|
|
|
|
|
@@ -5443,7 +5443,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
|
|
|
|
|
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
|
|
|
|
|
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
|
|
|
|
|
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
|
|
|
|
|
/* The request spawns mutliple keys in the same slot,
|
|
|
|
|
/* The request spawns multiple keys in the same slot,
|
|
|
|
|
* but the slot is not "stable" currently as there is
|
|
|
|
|
* a migration or import in progress. */
|
|
|
|
|
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
|
|
|
|
|
|