redis 集群 cluster
集群是redis分布式数据库方案,通过分片进行数据共享,并提供复制和故障转移功能。
关键技术点:集群节点、槽指派、命令执行、重新分片、转向、故障转移、消息。
节点
向一个节点发送命令cluster meet <ip> <port>
将指定ip节点添加为集群
在集群模式下会用到三个结构:clusterNode、clusterLink、clusterState
clusterNode 保存节点的当前状态,比如节点的创建时间,节点的名字,配置纪元,ip+端口号。
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */ 节点创建时间
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ 节点名字 由40个16进制字符组成
int flags; /* CLUSTER_NODE_... */ 节点角色
uint64_t configEpoch; /* Last configEpoch observed for this node */ 配置纪元,用于故障转移
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known clients port of this node */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */ 连接节点有关信息
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
clusterLink 结构保存了连接节点的有关信息,比如套接字描述符、输入输入缓冲区等
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */ 连接的创建时间
connection *conn; /* Connection to remote node */
sds sndbuf; /* Packet send buffer */ 输出缓冲区,保存等待发送给其他节点的消息
sds rcvbuf; /* Packet reception buffer */ 输入缓冲区,保存其他节点接收到的消息
struct clusterNode *node; /* Node related to this link if any, or NULL */ 与这个连接关联的节点
} clusterLink;
最后 clusterState 结构保存集群的状态,例如集群当前在线还是下线、节点数量、配置纪元等
typedef struct clusterState {
clusterNode *myself; /* This node */ 指向当前节点的指针
uint64_t currentEpoch; 集群当前的配置纪元
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ 在线还是下线
int size; /* Num of master nodes with at least one slot */ 个数
dict *nodes; /* Hash table of name -> clusterNode structures */ 集群节点名单
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ 短时间内不再添加的节点名单
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
uint64_t slots_keys_count[CLUSTER_SLOTS];
rax *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
} clusterState;
cluster meet命令实现原理
收到命令的节点A与指定的节点B进行握手,确认彼此存在
- 为节点B创建clusterNode结构,并将该结构添加到自己的clusterState.nodes字典里面
- 之后A根据ip和端口,向B节点发一条meet消息
- B收到A的meet消息后,会为A创建一个clusterNode结构,并添加到自己的clusterState.nodes里
- 之后,B向A发送一条pong消息
- A成功接收到pong消息,通过这个消息,A可以确认B收到自己发送的meet消息
- A节点返回一条ping消息
- B收到A的ping消息,说明A已经收到B发送的pong消息,握手完成。
之后A会将B节点信息通过Gossip协议传播给其他节点。最终B被所有节点认识
Gossip协议
常见的分布式共识协议,如Paxos、Raft、ZAB等分布式算法被称为“强一致性的”分布式共识协议,意思是“尽管系统内部可能存在不一致的状态,但从系统外部来看,不一致的情况并不会被观察到,所以整体上系统是强一致性的”。
另外还有一类“最终一致性”的分布式共识协议,这表明系统内不一致状态可能被观察到。
常见的“最终一致性”分布式系统,就是DNS系统。
还有就是Gossip协议。Gossip协议主要应用于比特币、consul(跨数据中心同步)。
虽说Gossip被叫做分布式共识算法,但是其所解决的问题并不是直接与Paxos、Raft这些共识算法等价,只是基于Gossip之上通过某些方法实现与Paxos、Raft算法相类似的目标。
Gossip协议的工作过程,可看做两个步骤的循环:
- 如果某项信息需要在网络的所有节点传播,那从信息源开始,选择一个固定的传播周期(比如1秒),随机选择与它相连接的K个节点(称为fan-out)来传播消息。
- 如果一个节点收到之前未收到过的消息,那么就会在下一个周期内,把消息发送给除了给它发消息的节点之外相邻的k个节点,直到网络中所有节点都收到这条消息。尽管这个过程需要一定时间,但是理论上所有节点都会拥有。
Gossip对网络节点的连通性和稳定性几乎没有要求,表现在:
- 一开始就是和一些节点部分连通,而不是全连通网络。
- 能容忍节点的随意增加与减少,随意宕机与重启,增加或重启的节点的状态,最终与其他节点的状态一致。
Gossip把所有节点一视同仁,没有主节点概念。
Gossip的缺点:
- 消息通过多个轮次的散播到达全网,因此必然存在各节点状态的不一致的情况。
- 消息的冗余。由于是随机选取发送消息的节点,会不可避免的出现消息重复发送给同一节点的情况。冗余发送会增加网络传输压力,也会给节点带来额外的处理负载。
槽指派
集群通过分片的方式保存所有键值对:集群的整个数据库被分为16384个槽slot,集群每个节点可以处理0到16384个槽。
当数据库中所有槽都有节点在处理,集群的状态为上线状态ok,否则为下线状态fail。
槽指派命令:cluster addslots <slot>
slot空格隔开
记录节点的槽指派信息
clusterNode结构的slots、numslots属性记录了节点负责处理哪些槽:
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
slots是个二进制位数组bit array,长度为16384/8=2048 个字符。包含16384个二进制位。
以0到16383为索引,根据索引i上的值来判断节点是否处理槽i:槽对应的值为1表示当前节点负责处理,0为不处理。
检查和设置节点处理某个槽的时间复杂度都为O(1)。
numslots属性:记录位数组值为1的数量,即当前节点处理槽的数量。
传播节点的槽指派信息:
除了记录自己的槽指派信息外,还会告知其他节点自己处理哪些槽,将自己的slots数组发送给其他节点。
当其他节点收到消息后,将clusterState.nodes字典保存的节点信息对应节点的slots属性更新。
因为每个节点都会发送自己的,并且接收其他节点的消息,所以,集群每个节点都会知道16384个槽分给哪些节点。
记录集群所有槽指派信息:
clusterState结构的slots数组记录16384个槽的指派信息:
数组每一项是一个指向clusterNode的指针:
- 如果slots[i]指向null,说明槽i没有指派
- 如果slots[i]指向一个clusterNode,说明已经指派给该节点了
为什么需要这么保存呢?
因为只将槽指派信息保存在各个节点的clusterNode.slots数组里,会出现一些无法高效解决的问题:
- 如果只在clusterNode.slots保存,那么为了知道槽i是否被指派、以及槽i指派给哪些节点,需要遍历clusterState.nodes字典的所有节点,找到后再检查该节点的slots数组,直到找到负责处理槽i的节点为止。这个过程时间复杂度为O(N),N为nodes字典节点个数。
- 而通过将槽指派信息保存在clusterState.slots数组里,检查某个槽是否被指派、指派给哪个节点的时间复杂度为O(1),只需访问clusterState.slots[i]即可。
但使用clusterNode的slots数组记录单个节点的槽指派信息还是有必要的:
- 每次将节点A的槽指派信息传播给其他节点时,如果只有clusterState.slots的话,需要遍历数组,记录A节点处理哪些槽,然后才能发送A的槽指派信息;不如用clusterNode.slots记录单个节点的槽指派信息快。
clusterState.slots数组记录所有槽指派
clusterNode.slots 记录当前节点的槽指派
在集群中执行命令
当所有槽都被指派完后,集群进入上线状态,客户端可以向集群的节点发送命令了。
当客户端发送与键有关的命令时,节点会先计算数据库键属于哪个槽,并检查这个槽是否分派给自己:
- 如果键所在的槽刚好分派给当前节点,那么直接执行命令;
- 如果键所在的槽没有分派给当前节点,那么节点会返回一个moved错误,并重定向到正确的槽执行该命令。
计算键属于哪个槽
算法:
CRC16(key) & 16384
crc16用于计算key的校验和,& 16384 用于计算出一个位于0-16384之间的整数作为key的槽号。cluster keyslot <key>
命令可查看key位于哪个槽
重新分片
集群分片操作将任意数量已经指派给某个节点的槽改为指派给另一节点,包括对应的键值对。且不需要下线。
重新分片原理:
由redis集群管理软件redis-trib负责,redis-trib向源节点和目标节点发送命令实现:
- rt对目标节点发送
cluster setslot <slot> importing <source_id>
,让目标节点准备好从源节点导入属于slot的键值对。 - rt对源节点发送
cluster setslot <slot> migrating <targit_id>
,让源节点准备好将slot的键值对迁移到目标节点。 - rt对源节点发送
cluster getkeysinslot <slot> <count>
命令,获取最多count个属于slot槽的键值对的key。 - 对步骤3获取的每个键名,rt都向源节点发送一个
migrate <target_ip> <target_port> <key_name> 0 <timeout>
命令,将被选中的键原子的迁移到目标节点。 - 重复3和4,直到源节点所有属于槽slot的键值对都被转移到目标节点。每次迁移过程如图
- rt向集群任意节点发送
cluster setslot <slot> node <target_id>
命令,使得集群所有节点都知道槽slot指派给了目标节点。
图17-25 对槽slot重新分片的过程:
cluster setslot <slot> importing <source_id>
命令的实现
clusterState结构的importing_slots_from数组记录了当前节点正在从其他节点导入的槽:
clusterNode *importing_slots_from[CLUSTER_SLOTS];
如果importing_slots_from[i]
的值不为null,而是指向clusterNode结构,那么表示当前节点正在从clusterNode代表的节点导入槽i。
重新分片时发送cluster setslot <i> importing <source_ip>
命令,将目标节点的importing_slots_from[i]
值设置为source_id代表的节点clusterNode结构。
举个例子:
cluster setslot <slot> migrating <target_id>
命令的实现
clusterState结构的migrating_slots_to数组记录了当前节点正在迁移到其他节点的槽:
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
如果migrating_slots_to[i]
的值不为null,而是指向clusterNode结构,那么表示当前节点正在将槽i迁移至clusterNode代表的节点。
重新分片时发送cluster setslot <i> migrating <target_id>
命令,将目标节点的migrating_slots_to[i]
值设置为target_id代表的节点clusterNode结构。
复制与故障转移
redis集群节点分主从,主节点用于处理槽,从节点复制主节点,并在主下线后代替主节点继续执行命令。
如果一个主节点有多个从节点,在主节点下线后,其他主节点会在下线主节点的从节点中选出一个做为新的主节点,其他从节点复制新的主节点。如果下线的主节点上线后,将成为新的主节点的从节点。
故障检测:
如果集群半数以上处理槽的主节点都将某个主节点报告为疑似下线,那么该主节点被标记为已下线fail。
故障转移:
当一个从节点发现复制的主节点进入下线状态,将进行故障转移:
- 选举新的主节点
- 被选中的从节点会执行
slaveof no one
命令,成为新的主节点。 - 新主节点会撤销对已下线主节点的槽指派,将所有槽指派个自己。
- 新主节点向集群广播一条pong 消息,这条消息会让其他节点知道这个节点已经成为新的主节点,并且已经接管了所有槽。
- 新的主节点开始接收和自己槽有关的命令,故障转移成功。
选举新主节点(raft算法):
- 集群的配置纪元是一个自增计数器,初始为0.
- 当某个节点开始一次故障转移操作时,集群配置纪元加1.
- 对于每个配置纪元,每个主节点都有一次投票机会,而第一个向主节点要求投票的从节点,将获取主节点的投票。
- 当从节点发现自己的主节点下线后,会向集群广播一条clustermsg_type_failover_auth_request消息,要求所有收到这个消息、并且具有投票权的主节点向这个节点投票。
- 如果一个主节点具有投票权,并且尚未投给其他从节点,那么主节点将向要求他投票的从节点返回一条clustermsg_type_failover_auth_ack消息,表示支持这个从节点成为新的主节点。
- 每个参与选举的从节点都会接收clustermsg_type_failover_auth_ack消息,统计多少条表示获得多少主节点的支持。
- 如果集群有个N个有投票权的主节点,当一个从节点收到的支持票数大于一半以上(即大于N/2+1)时,这个节点将成为新主节点。
- 在每个配置纪元里,每个主节点只有一次投票权,大于等于N/2+1个支持票的从节点只有一个。
- 如果在一个配置纪元里没有从节点收集到足够多的票时,集群会进入下一个配置纪元,重新选举。直到成功选举。
和选举头领sentinel方法一样。