Redis Cluster 原理相關說明

背景

      之前寫的 Redis Cluster部署、管理和測試 和 Redis 5.0 redis-cli --cluster help說明 已經比較詳細的介紹了如何安裝和維護Cluster。但關于Cluster各個節點的通信和原理沒有說明,為了方便自己以后查閱,先做些記錄。順便對Redis 4.0和5.0的相關特性也做下說明。

Redis 4.0 新功能說明

Redis4.0版本增加了很多新的特性,如:

1 Redis Memeory Command:詳細分析內存使用情況,內存使用診斷,內存碎片回收;
2 PSYNC2:解決failover和從實例重啟不能部分同步;PSYNC3已經路上了;
3 LazyFree: 再也不用怕big key的刪除引起集群故障切換;
4 LFU: 支持近似的LFU內存淘汰算法;
5 Active Memory Defragmentation:內存碎片回收效果很好(實驗階段);
6 Modules: Redis成為更多的可能(覺得像mongo/mysql引入engine的階段);

一、Lazyfree

redis-4.0帶來的Lazyfree機制可以避免del,flushdb/flushall,rename等命令引起的redis-server阻塞,提高服務穩定性。

在redis-4.0之前,redis執行del命令會在釋放掉key的所有內存以后才會返回OK,這在key比較大的時候(比如說一個hash里頭有1000W條數據),其他連接可能要等待很久。為了兼容已有的del語義,redis-4.0引入unlink命令,效果以及用法和del完全一樣,但內存釋放動作放到后臺線程中執行。

UNLINK key [key ...]

② flushdb/flushall

flushdb/flushall在redis-4.0中新引入了選項,可以指定是否使用Lazyfree的方式來清空整個內存。

FLUSHALL [ASYNC]
FLUSHDB [ASYNC]

③ rename

執行 rename oldkey newkey 時,如果newkey已經存在,redis會先刪除,這也會引發上面提到的刪除大key問題。

lazyfree-lazy-server-del yes/no

④ 其他場景

某些用戶對數據設置過期時間,依賴redis的淘汰機制去刪除已經過期的數據,這同樣也存在上面提到的問題,淘汰某個大key會導致進程CPU出現抖動,redis-4.0提供了兩個配置,可以讓redis在淘汰或者逐出數據時也使用lazyfree的方式。

lazyfree-lazy-eviction yes/no
lazyfree-lazy-expire yes/no

二、memory

redis-4.0之前只能通過info memory來了解redis內部有限的內存信息,4.0提供了memory命令,幫助用戶全面了解redis的內存狀態。

127.0.0.1:6379> memory help
1) "MEMORY DOCTOR                        - Outputs memory problems report"
2) "MEMORY USAGE <key> [SAMPLES <count>] - Estimate memory usage of key"
3) "MEMORY STATS                         - Show memory usage details"
4) "MEMORY PURGE                         - Ask the allocator to release memory"
5) "MEMORY MALLOC-STATS                  - Show allocator internal stats"

① memory usage

usage子命令可以查看某個key在redis內部實際占用多少內存,這里有兩點需要說明:

1.  不光key, value需要占用內存,redis管理這些數據還需要一部分內存

2. 對于hash, list, set, sorted set這些類型,結果是采樣計算的,可以通過SAMPLES 來控制采樣數量

② memory stats

在redis 4.0之前,我們只能通過info memory查看redis實例的內存大體使用狀況;而內存的使用細節,比如expire的消耗,client output buffer, query buffer等是很難直觀顯示的。 memory stats命令就是為展現redis內部內存使用細節。

③ memory doctor

主要用于給一些診斷建議,提前發現潛在問題。

④ memory purge

memory purge命令通過調用jemalloc內部命令,進行內存釋放,盡量把redis進程占用但未有效使用內存,即常說的內存碎片釋放給操作系統。只適用于使用jemalloc作為allocator的實例。

⑤ memory malloc-stats

用于打印allocator內部的狀態,目前只支持jemalloc。

三、LFU

redis-4.0新增了 allkey-lfu 和 volatile-lfu 兩種數據逐出策略,同時還可以通過object命令來獲取某個key的訪問頻度。

object freq user_key

基于LFU機制,用戶可以使用 scan + object freq 來發現熱點key,當然redis也一起發布了更好用的 :

redis-cli --hotkeys

四、psync2

Redis4.0新特性psync2(partial resynchronization version2)部分重新同步(partial resync)增加版本;主要解決Redis運維管理過程中,從實例重啟和主實例故障切換等場景帶來的全量重新同步(full resync)問題。

五、持久化

redis有兩種持久化的方式——RDB和AOF其中RDB是一份內存快照AOF則為可回放的命令日志他們兩個各有特點也相互獨立。4.0開始允許使用RDB-AOF混合持久化的方式結合了兩者的優點通過aof-use-rdb-preamble配置項可以打開混合開關。

Redis 5.0 新功能說明

Redis5.0版是Redis產品的重大版本發布,它的最新特點: 

 1 新的流數據類型(Stream data type) https://redis.io/topics/streams-intro
 2 新的 Redis 模塊 API:定時器、集群和字典 API(Timers, Cluster and Dictionary APIs)
 3 RDB 增加 LFU 和 LRU 信息
 4 集群管理器從 Ruby (redis-trib.rb) 移植到了redis-cli 中的 C 語言代碼
 5 新的有序集合(sorted set)命令:ZPOPMIN/MAX 和阻塞變體(blocking variants)
 6 升級 Active defragmentation 至 v2 版本
 7 增強 HyperLogLog 的實現
 8 更好的內存統計報告
 9 許多包含子命令的命令現在都有一個 HELP 子命令
10 客戶端頻繁連接和斷開連接時,性能表現更好
11 許多錯誤修復和其他方面的改進
12 升級 Jemalloc 至 5.1 版本
13 引入 CLIENT UNBLOCK 和 CLIENT ID
14 新增 LOLWUT 命令 http://antirez.com/news/123
15 在不存在需要保持向后兼容性的地方,棄用 "slave" 術語
16 網絡層中的差異優化
17 Lua 相關的改進
18 引入動態的 HZ(Dynamic HZ) 以平衡空閑 CPU 使用率和響應性
19 對 Redis 核心代碼進行了重構并在許多方面進行了改進

Redis Cluster總覽

一、簡介

官方文檔Cluster Spec中,作者詳細介紹了Redis集群為什么要設計成現在的樣子。最核心的目標有三個:

1 性能:增加集群功能后不能對性能產生太大影響,所以Redis采取了P2P而非Proxy方式、異步復制、客戶端重定向等設計。
2 水平擴展:文檔中稱可以線性擴展到1000結點。
3 可用性:在Cluster推出之前,可用性要靠Sentinel保證。有了集群之后也自動具有了Sentinel的監控和自動Failover能力。

如果需要全面的了解,那一定要看官方文檔Cluster Tutorial

Redis Cluster是一個高性能高可用的分布式系統。由多個Redis實例組成的整體,數據按照Slot存儲分布在多個Redis實例上,通過Gossip協議來進行節點之間通信。功能特點如下:

1 所有的節點相互連接
2 集群消息通信通過集群總線通信,集群總線端口大小為客戶端服務端口+10000固定值)
3 節點與節點之間通過二進制協議進行通信
4 客戶端和集群節點之間通信和通常一樣,通過文本協議進行
5 集群節點不會代理查詢
6 數據按照Slot存儲分布在多個Redis實例上
7 集群節點掛掉會自動故障轉移
8 可以相對平滑擴/縮容節點

關于Cluster相關的源碼可以見:src/cluster.c 和 src/cluster.h

二、通信 

2.1 CLUSTER MEET

需要組建一個真正的可工作的集群,我們必須將各個獨立的節點連接起來,構成一個包含多個節點的集群。連接各個節點的工作使用CLUSTER MEET命令來完成。

CLUSTER MEET <ip> <port>

CLUSTER MEET命令實現:

1 節點A會為節點B創建一個 clusterNode 結構,并將該結構添加到自己的 clusterState.nodes 字典里面。
2 節點A根據CLUSTER MEET命令給定的IP地址和端口號,向節點B發送一條MEET消息。
3 節點B接收到節點A發送的MEET消息,節點B會為節點A創建一個clusterNode結構,并將該結構添加到自己的clusterState.nodes字典里面。
4 節點B向節點A返回一條PONG消息。
5 節點A將受到節點B返回的PONG消息,通過這條PONG消息節點A可以知道節點B已經成功的接收了自己發送的MEET消息。
6 節點A將向節點B返回一條PING消息。
7 節點B將接收到的節點A返回的PING消息,通過這條PING消息節點B可以知道節點A已經成功的接收到了自己返回的PONG消息,握手完成。
8 節點A會將節點B的信息通過Gossip協議傳播給集群中的其他節點,讓其他節點也與節點B進行握手,最終,經過一段時間后,節點B會被集群中的所有節點認識。

2.2 消息處理 clusterProcessPacket

1 更新接收消息計數器
2 查找發送者節點并且不是handshake節點
3 更新自己的epoch和slave的offset信息
4 處理MEET消息,使加入集群
5 從goosip中發現未知節點,發起handshake
6 對PING,MEET回復PONG
7 根據收到的心跳信息更新自己clusterState中的master-slave,slots信息
8 對FAILOVER_AUTH_REQUEST消息,檢查并投票
9 處理FAIL,FAILOVER_AUTH_ACK,UPDATE信息

2.3 定時任務clusterCron

1 對handshake節點建立Link,發送Ping或Meet
2 向隨機節點發送Ping
3 如果是從查看是否需要做Failover
4 統計并決定是否進行slave的遷移,來平衡不同master的slave數
5 判斷所有pfail報告數是否過半數

2.4 心跳數據

集群中的節點會不停(每幾秒)的互相交換ping、pong包,ping和pong包具有相同的結構,只是類型不同,ping、pong包合在一起叫做心跳包。通常節點會發送ping包并接收接收者返回的pong包,不過這也不是絕對,節點也有可能只發送pong包,而不需要讓接收者發送返回包。

節點間通過ping保持心跳以及進行gossip集群狀態同步,每次心跳時,節點會帶上多個clusterMsgDataGossip消息體,經過多次心跳,該節點包含的其他節點信息將同步到其他節點。

ping和pong包的內容可以分為header和gossip消息兩部分:

  • 發送消息頭信息Header
  1. 所負責slots的信息
  2. 主從信息
  3. ip, port信息
  4. 狀態信息

包含的信息:

1 NODE ID是一個160bit的偽隨機字符串,它是節點在集群中的唯一標識
2 currentEpoch和configEpoch字段
3 node flag,標識節點是master還是slave,另外還有一些其他的標識位,如PFAIL和FAIL。
4 節點提供服務的hash slot的bitmap
5 發送者的TCP端口
6 發送者認為的集群狀態(down or ok)
7 如果是slave,則包含master的NODE ID
  • 發送其他節點Gossip信息。包含了該節點認為的其他節點的狀態,不過不是集群的全部節點(隨機)
  1. ping_sent, pong_received
  2. ip, port信息
  3. 狀態信息,比如發送者認為該節點已經不可達,會在狀態信息中標記其為PFAIL或FAIL

包含的信息:

1 NODE ID
2 節點的IP和端口
3 NODE flags

clusterMsg結構的currentEpoch、sender、myslots等屬性記錄了發送者自身的節點信息,接收者會根據這些信息,在自己的clusterState.nodes字典里找到發送者對應的結構,并對結構進行更新。

Redis集群中的各個節點通過ping來心跳,通過Gossip協議來交換各自關于不同節點的狀態信息,其中Gossip協議由MEET、PING、PONG三種消息實現,這三種消息的正文都由兩個clusterMsgDataGossip結構組成。

每次發送MEET、PING、PONG消息時,發送者都從自己的已知節點列表中隨機選出兩個節點(可以是主節點或者從節點),并將這兩個被選中節點的信息分別保存到兩個結構中。當接收者收到消息時,接收者會訪問消息正文中的兩個結構,并根據自己是否認識clusterMsgDataGossip結構中記錄的被選中節點進行操作:

1 如果被選中節點不存在于接收者的已知節點列表,那么說明接收者是第一次接觸到被選中節點,接收者將根據結構中記錄的IP地址和端口號等信息,與被選擇節點進行握手。
2 如果被選中節點已經存在于接收者的已知節點列表,那么說明接收者之前已經與被選中節點進行過接觸,接收者將根據clusterMsgDataGossip結構記錄的信息,對被選中節點對應的clusterNode結構進行更新。

2.5 數據結構

clusterNode 結構保存了一個節點的當前信息, 如節點的創建時間、節點的名字、節點當前的配置紀元、節點的 IP 和端口等:

1 slots:位圖,由當前clusterNode負責的slot為1
2 salve, slaveof:主從關系信息
3 ping_sent, pong_received:心跳包收發時間
4 clusterLink *link:節點間的連接
5 list *fail_reports:收到的節點不可達投票

clusterState 結構記錄了在當前節點的集群目前所處的狀態:

1 myself:指針指向自己的clusterNode
2 currentEpoch:當前節點的最大epoch,可能在心跳包的處理中更新
3 nodes:當前節點記錄的所有節點的字典,為clusterNode指針數組
4 slots:slot與clusterNode指針映射關系
5 migrating_slots_to,importing_slots_from:記錄slots的遷移信息
6 failover_auth_time,failover_auth_count,failover_auth_sent,failover_auth_rank,failover_auth_epoch:Failover相關信息

clusterLink 結構保存了連接節點的有關信息, 比如套接字描述符, 輸入緩沖區和輸出緩沖區。

三、數據分布及槽信息

3.1 槽(slot)概念

Redis Cluster中有一個16384長度的槽的概念,他們的編號為0、1、2、3……16382、16383。這個槽是一個虛擬的槽,并不是真正存在的。正常工作的時候,Redis Cluster中的每個Master節點都會負責一部分的槽,當有某個key被映射到某個Master負責的槽,那么這個Master負責為這個key提供服務,至于哪個Master節點負責哪個槽,這是可以由用戶指定的,也可以在初始化的時候自動生成。在Redis Cluster中,只有Master才擁有槽的所有權,如果是某個Master的slave,這個slave只負責槽的使用,但是沒有所有權。

3.2 數據分片

在Redis Cluster中,擁有16384個slot,這個數是固定的,存儲在Redis Cluster中的所有的鍵都會被映射到這些slot中。數據庫中的每個鍵都屬于這 16384 個哈希槽的其中一個,集群使用公式 CRC16(key) % 16384 來計算鍵 key 屬于哪個槽,其中 CRC16(key) 用于計算鍵 key 的 CRC16 校驗和,集群中的每個節點負責處理一部分哈希槽。

3.3 節點的槽指派信息

clusterNode結構的slots屬性和numslot屬性記錄了節點負責處理那些槽:

struct clusterNode {

           //

           unsignedchar slots[16384/8];

};

Slots屬性是一個二進制位數組(bitarray),這個數組的長度為16384/8=2048個字節,共包含16384個二進制位。Master節點用bit來標識對于某個槽自己是否擁有。比如對于編號為1的槽,Master只要判斷序列的第二位(索引從0開始)是不是為1即可。時間復雜度為O(1)。

3.4 集群所有槽的指派信息

通過將所有槽的指派信息保存在clusterState.slots數組里面,程序要檢查槽i是否已經被指派,又或者取得負責處理槽i的節點,只需要訪問clusterState.slots[i]的值即可,復雜度僅為O(1)。

3.5 請求重定向

由于每個節點只負責部分slot,以及slot可能從一個節點遷移到另一節點,造成客戶端有可能會向錯誤的節點發起請求。因此需要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不同的重定向場景:

 a) MOVED錯誤

  • 請求的key對應的槽不在該節點上,節點將查看自身內部所保存的哈希槽到節點 ID 的映射記錄,節點回復一個 MOVED 錯誤。

  • 需要客戶端進行再次重試。

 b) ASK錯誤

  • 請求的key對應的槽目前的狀態屬于MIGRATING狀態,并且當前節點找不到這個key了,節點回復ASK錯誤。ASK會把對應槽的IMPORTING節點返回給你,告訴你去IMPORTING的節點查找。

  • 客戶端進行重試 首先發送ASKING命令,節點將為客戶端設置一個一次性的標志(flag),使得客戶端可以執行一次針對 IMPORTING 狀態的槽的命令請求,然后再發送真正的命令請求。

  • 不必更新客戶端所記錄的槽至節點的映射。

四、數據遷移

當槽x從Node A向Node B遷移時,Node A和Node B都會有這個槽x,Node A上槽x的狀態設置為MIGRATING,Node B上槽x的狀態被設置為IMPORTING。

MIGRATING狀態

  1. 如果key存在則成功處理

  2. 如果key不存在,則返回客戶端ASK,客戶端根據ASK首先發送ASKING命令到目標節點,然后發送請求的命令到目標節點

  3. 當key包含多個:

    1. 如果都存在則成功處理

    2. 如果都不存在,則返回客戶端ASK

    3. 如果一部分存在,則返回客戶端TRYAGAIN,通知客戶端稍后重試,這樣當所有的key都遷移完畢的時候客戶端重試請求的時候回得到ASK,然后經過一次重定向就可以獲取這批鍵

  4. 此時不刷新客戶端中node的映射關系

IMPORTING狀態

  1. 如果key不在該節點上,會被MOVED重定向,刷新客戶端中node的映射關系

  2. 如果是ASKING則命令會被執行,key不在遷移的節點已經被遷移到目標的節點

  3. Key不存在則新建

Key遷移的命令:

1 DUMP:在源(migrate)上執行
2 RESTORE:在目標(importing)上執行
3 DEL:在源(migrate)上執行 

經過上面三步可以將鍵遷移,然后再將處于MIGRATING和IMPORTING狀態的槽變為常態,完成整個重新分片的過程,具體的信息可見Redis Cluster部署、管理和測試 。

4.1 讀寫請求

槽里面的key還未遷移,并且槽屬于遷移中。

假如槽x在Node A,需要遷移到Node B上,槽x的狀態為migrating,其中的key1還沒輪到遷移。此時訪問key1則先計算key1所在的Slot,存在key1則直接返回。

4.2 MOVED請求

槽里面的key已經遷移過去,并且槽屬于遷移完。

假如槽x在Node A,需要遷移到Node B上,遷移完成。此時訪問key1則先計算key1所在的Slot,因為已經遷移至Node B上,Node A上不存在,則返回 moved slotid IP:PORT,再根據返回的信息去Node B訪問key1

4.3 ASK請求

槽里面的key已經遷移完,并且槽屬于遷移中的狀態。

假如槽x在Node A,需要遷移到Node B上,遷移完成,但槽x的狀態為migrating。此時訪問key1則先計算key1所在的Slot,不存在key1則返回ask slotid IP:PORT,再根據ask返回的信息發送asking請求到Node B,沒問題后則最后再去Node B上訪問key1。

五、通信故障

5.1    故障檢測

集群中的每個節點都會定期地向集群中的其他節點發送PING消息,以此交換各個節點狀態信息,檢測各個節點狀態:在線狀態、疑似下線狀態PFAIL、已下線狀態FAIL。

當主節點A通過消息得知主節點B認為主節點D進入了疑似下線(PFAIL)狀態時,主節點A會在自己的clusterState.nodes字典中找到主節點D所對應的clusterNode結構,并將主節點B的下線報告(failure report)添加到clusterNode結構的fail_reports鏈表中。

struct clusterNode {
           //...
           //記錄所有其他節點對該節點的下線報告
           list*fail_reports;     
           //...
};

如果集群里面,半數以上的主節點都將主節點D報告為疑似下線,那么主節點D將被標記為已下線(FAIL)狀態,將主節點D標記為已下線的節點會向集群廣播主節點D的FAIL消息,所有收到FAIL消息的節點都會立即更新nodes里面主節點D狀態標記為已下線。

將 node 標記為 FAIL 需要滿足以下兩個條件:

1 有半數以上的主節點將 node 標記為 PFAIL 狀態。
2 當前節點也將 node 標記為 PFAIL 狀態。

5.2    多個從節點選主

選新主的過程基于Raft協議選舉方式來實現的:

1 當從節點發現自己的主節點進行已下線狀態時,從節點會廣播一條CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST消息,要求所有收到這條消息,并且具有投票權的主節點向這個從節點投票
2 如果一個主節點具有投票權,并且這個主節點尚未投票給其他從節點,那么主節點將向要求投票的從節點返回一條,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,表示這個主節點支持從節點成為新的主節點
3 每個參與選舉的從節點都會接收CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK消息,并根據自己收到了多少條這種消息來統計自己獲得了多少主節點的支持
4 如果集群里有N個具有投票權的主節點,那么當一個從節點收集到大于等于集群N/2+1張支持票時,這個從節點就成為新的主節點
5 如果在一個配置紀元沒有從能夠收集到足夠的支持票數,那么集群進入一個新的配置紀元,并再次進行選主,直到選出新的主節點為止

5.3    故障轉移

錯誤檢測用于識別集群中的不可達節點是否已下線,如果一個master下線,會將它的slave提升為master,在gossip消息中,NODE flags的值包括兩種PFAIL和FAIL。

PFAIL flag:

如果一個節點發現另外一個節點不可達的時間超過NODE_TIMEOUT ,則會將這個節點標記為PFAIL,即Possible failure(可能下線)。節點不可達是說一個節點發送了ping包,但是等待了超過NODE_TIMEOUT時間仍然沒有收到回應(NODE_TIMEOUT必須大于一個網絡包來回的時間)。

FAIL flag:

PFAIL標志只是一個節點本地的信息,為了使slave提升為master,需要將PFAIL升級為FAIL。PFAIL升級為FAIL需要滿足一些條件:

1 A節點將B節點標記為PFAIL
2 A節點通過gossip消息收集其他大部分master節點標識的B節點的狀態
3 大部分master節點在NODE_TIMEOUT * FAIL_REPORT_VALIDITY_MULT時間段內,標識B節點為PFAIL或FAIL

如果滿足以上條件,A節點會將B節點標識為FAIL并且向所有節點發送B節點FAIL的消息。收到消息的節點也都會將B標為FAIL。

注意:FAIL狀態是單向的,只能從PFAIL升級為FAIL,而不能從FAIL降為PFAIL。

清除FAIL狀態:

  • 節點重新可達,并且是slave節點
  • 節點重新可達,并且是master節點,但是不提供任何slot服務
  • 節點重新可達,并且是master節點,但是長時間沒有slave被提升為master來頂替它

PFAIL提升到FAIL使用的是一種弱協議:

  • 節點收集的狀態不在同一時間點,會丟棄時間較早的報告信息,但是也只能保證節點的狀態在一段時間內大部分master達成了一致
  • 檢測到一個FAIL后,需要通知所有節點,但是沒有辦法保證每個節點都能成功收到消息
因為是弱協議,Redis Cluster只要求所有節點對某個節點的狀態最終保持一致。如果大部分master認為某個節點FAIL,那么最終所有節點都會將其標為FAIL。而如果只有一小部分master節點認為某個節點FAIL,slave并不會被提升為master,因此,FAIL狀態將會被清除。

當從節點發現自己的主節點變為已下線(FAIL)狀態時,便嘗試進Failover,成為新的主。以下是故障轉移的執行步驟:

1下線主節點的所有從節點中選中一個從節點
2 被選中的從節點執行SLAVEOF NO NOE命令,成為新的主節點
3 新的主節點會撤銷所有對已下線主節點的槽指派,并將這些槽全部指派給自己
4 新的主節點對集群進行廣播PONG消息,告知其他節點已經成為新的主節點
5 新的主節點開始接收和處理槽相關的請求

相關結構

clusterNode

typedef struct clusterNode {
    mstime_t ctime;  // 該node創建時間
    char name[CLUSTER_NAMELEN]; // 40位的node名字
     // node 狀態標識通過投CLUSTER_NODE 定義。
     // 包括  maser slave self pfail fail handshake noaddr meet migrate_to null_name 這些狀態
    int flags;     
    // 本節點最新epoch
    uint64_t configEpoch; 
    // 當前node負責的slot 通過bit表示
    unsigned char slots[CLUSTER_SLOTS/8]; 
    int numslots;   
    int numslaves; 
    struct clusterNode **slaves; 
    // 如果該node為從 則指向master節點
    struct clusterNode *slaveof; 
    mstime_t ping_sent;      /* Unix time we sent latest ping */
    mstime_t pong_received;  /* Unix time we received the pong */
    mstime_t fail_time;      /* Unix time when FAIL flag was set */
    mstime_t voted_time;     /* Last time we voted for a slave of this master */
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */
    mstime_t orphaned_time;     /* Starting time of orphaned master condition */
    long long repl_offset;      /* Last known repl offset for this node. */
    char ip[NET_IP_STR_LEN];  /* Latest known IP address of this node */
    int port;                   /* Latest known port of this node */
    
    clusterLink *link;          
    // 將該節點標記為失敗的node list
    // 節點收到gossip消息后,如果gossip里標記該節點為pfail則加入改list
    // 比如:節點a向b發送gossip,消息包含了 c 節點且出于pfail,則a將被加入c的link。
    list *fail_reports;         
} clusterNode;
View Code

clusterMsgData:節點間通訊的數據結構 包含了 ping、fail、publish、update四種類型

struct {
       /* Array of N clusterMsgDataGossip structures */
       clusterMsgDataGossip gossip[1];
   } ping;
View Code

節點間通過ping保持心跳以及進行gossip集群狀態同步,每次心跳時,節點會帶上多個clusterMsgDataGossip(其他節點)消息體,經過多次心跳,該節點包含的其他節點信息將同步到其他節點。

clusterState:定義了完整的集群信息

struct clusterState{
   // 集群最新的epoch,為64位的自增序列 
   uint64_t currentEpoch;
   // 包含的所有節點信息
   dict *nodes; 
   // 每個slot所屬于的節點,包括處于migrating和importinng狀態的slot
   clusterNode *migrating_slots_to[CLUSTER_SLOTS];
   clusterNode *importing_slots_from[CLUSTER_SLOTS];
   clusterNode *slots[CLUSTER_SLOTS]; 
   // 當前節點所包含的key 用于在getkeysinslot的時候返回key信息
   zskiplist *slots_to_keys;
   ...   
}
View Code

redis啟動,判斷是否允許cluster模式,如果允許,則調用clusterInit進行cluster信息的初始化。clusterState被初始化為初始值。在后續節點meet及ping過程逐步更新clusterState信息。

Send Ping:

節點創建成功后,節點會向已知的其他節點發送ping消息保持心跳,ping消息體同時會攜帶已知節點的信息,并通過gossip同步到集群的其他節點。

node的ping由clusterCron負責調用,服務啟動時,在serverCron內部會注冊clusterCron,該函數每秒執行10次,在clusterCron內部,維護著static變量iteration記錄該函數被執行的次數:通過if (!(iteration % 10)){}的判斷,使得各節點每秒發送一次心跳。ping節點選擇的代碼邏輯如下:

clusterCron:

void clusterCron(void)
{
    // ...
    // 如果沒有設置handshake超時,則默認超時未1s
    handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000)
        handshake_timeout = 1000;

    // 遍歷nodes列表
    while ((de = dictNext(di)) != NULL)
    {
        //  刪除handshake超時的節點
        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout)
        {
            clusterDelNode(node);
            continue;
        }
        // 如果該節點的link為空,則為該節點新建連接,并且初始化ping初始時間
        if (node->link == NULL)
        {
        // 如果該節點處于meet狀態,則直接發送meet讓節點加入集群
        // 否則發送向該節點發送ping
        clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
        }
        // 函數每被調動10次,則發送一次ping,因此ping間隔為1s
        if (!(iteration % 10))
        {
        int j;
        for (j = 0; j < 5; j++)
        {
            // 隨機選取節點并過濾link為空的以及self
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);
            if (this->link == NULL || this->ping_sent != 0)
                continue;
            if (this->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE))
                continue;
            
            // 挑選距離上次pong間隔最久的節點
            // redis會盡量選擇距離上次ping間隔最久的節點,
            // 以此防止隨機不均勻導致某些節點一直收不到ping
            if (min_pong_node == NULL || min_pong > this->pong_received)
            {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
    }
}
View Code

發送ping的時候,會優先給新加入的節點發送ping,其實再選擇最久沒被更新的節點,通過對舊節點選擇的加權,盡可能地保證了集群最新狀態的一致。

每次ping請求,node會從已知的nodes列表里隨機選取n個節點(n=1/10*len(nodes)&& n>=3),一段時間后,該節點已知的nodes將被同步到集群的其他節點,集群狀態信息達成最終一致。具體實現代碼如下(只列出部分代碼,完整代碼見cluster.c/clusterSendPing)

clusterSendPing:

void clusterSendPing(clusterLink *link, int type)
{
   // 選取1/10 的節點數并且要求大于3.
   // 1/10是個魔數,為啥是1/10在源碼里有解釋
   int freshnodes = dictSize(server.cluster->nodes) - 2;   
    wanted = floor(dictSize(server.cluster->nodes) / 10);
   if (wanted < 3)
       wanted = 3;
   if (wanted > freshnodes)
       wanted = freshnodes;
   while (freshnodes > 0 && gossipcount < wanted && maxiterations--)
   {
   // 通過隨機函數隨機選擇一個節點,保證所有節點盡可能被同步到整個集群
   dictEntry *de = dictGetRandomKey(server.cluster->nodes);
   clusterNode *this = dictGetVal(de);
   // 為了保證失敗的節點盡可能快地同步到集群其他節點,
   // 優先選取處于pfail以及fail狀態的節點
   if (maxiterations > wanted * 2 &&
   !(this->flags & (CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
   continue;
   }
   // 如果被選中的節點處于
   // 1.handshake 并且noaddr狀態
   // 2.其他節點沒有包含該節點的信息,并且該節點沒有擁有slot
   // 則跳過該節點并且將可用的節點數減1,以較少gossip數據同步的開銷
   if (this->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_NOADDR) ||
   (this->link == NULL && this->numslots == 0))
   {
   freshnodes--; /* Tecnically not correct, but saves CPU. */
   continue;
   }
}
View Code

通過隨機選取合適數量的節點,以及對節點狀態的過濾,保證了盡可能快的達成最終一致性的同時,減少gossip的網絡開銷。

cluster監聽cluster端口,并通過clusterAcceptHandler接受集群節點發起的連接請求,通過aeCreateFileEvent將clusterReadHandler注冊進事件回調,讀取node發送的數據包。clusterReadHandler讀取到完整的數據包后,調用clusterProcessPacket處理包請求。clusterProcessPacket包含收到數據包后完整的處理邏輯。

clusterProcessPacket:

int clusterProcessPacket(clusterLink *link)
{
   // 判斷是否為ping請求并校驗數據包長度
   if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
       type == CLUSTERMSG_TYPE_MEET)
   {
       uint16_t count = ntohs(hdr->count);
       uint32_t explen; /* expected length of this packet */

       explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
       explen += (sizeof(clusterMsgDataGossip) * count);
       if (totlen != explen)
           return 1;
   }   
   // ...
   
   // 是否為已知節點
   sender = clusterLookupNode(hdr->sender);
   if (sender && !nodeInHandshake(sender))
   {
       //  比較epoch并更新為最大的epoch
       if (senderCurrentEpoch > server.cluster->currentEpoch)
           server.cluster->currentEpoch = senderCurrentEpoch;
       /* Update the sender configEpoch if it is publishing a newer one. */
       if (senderConfigEpoch > sender->configEpoch)
       {
           sender->configEpoch = senderConfigEpoch;
           clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
                                CLUSTER_TODO_FSYNC_CONFIG);
       }
   }
   // 回復pong數據包
   clusterSendPing(link, CLUSTERMSG_TYPE_PONG);

   // 獲取gossip消息并處理gossip請求
   if (sender)
   clusterProcessGossipSection(hdr, link);
}
View Code

clusterProcessGossipSection 讀取攜帶的gossip node內容,并判斷這些node是否failover:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link)
{
    // ...


    if (flags & (CLUSTER_NODE_FAIL | CLUSTER_NODE_PFAIL))
{
    if (clusterNodeAddFailureReport(node, sender))
    {
        serverLog(LL_VERBOSE,
                  "Node %.40s reported node %.40s as not reachable.",
                  sender->name, node->name);
    }
    markNodeAsFailingIfNeeded(node);
}
else
{
    // 如果該node并非出于fail狀態,則從fail link里刪除該node
    if (clusterNodeDelFailureReport(node, sender))
    {
        serverLog(LL_VERBOSE,
                  "Node %.40s reported node %.40s is back online.",
                  sender->name, node->name);
    }
}
}
View Code
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender)
{
    while ((ln = listNext(&li)) != NULL)
    {
        fr = ln->value;
        if (fr->node == sender)
            break;
    }
    if (!ln)
    return 0;
    // 如果之前被標記為失敗,則從失敗list里刪除
    listDelNode(l, ln);
}
View Code

cluster會根據收到的gossip包里的msgdata來更新集群的狀態信息,包括epoch,以及其余節點的狀態。如果node被標記為pfail或fail,則被加入fail_reports,當fail_reports長度超過半數節點數量時,該節點及被標記為failover。
 

總結:

      Redis Cluster采用無中心節點方式實現,無需proxy代理,客戶端直接與redis集群的每個節點連接,根據同樣的hash算法計算出key對應的slot,然后直接在slot對應的Redis上執行命令。從CAP定理來看,Cluster支持了AP(Availability&Partition-Tolerancy),這樣讓Redis從一個單純的NoSQL內存數據庫變成了分布式NoSQL數據庫。

 

參考文檔:

深入淺出 Redis Cluster 原理

Redis集群之Cluster

redis cluster 源碼閱讀之基本結構及Gossip

 

posted @ 2019-10-09 13:43 jyzhou 閱讀(...) 評論(...) 編輯 收藏
七乐彩2011年走势图南方双彩