快捷搜索:  汽车  科技

etcd存储什么数据的?Etcd源码分析存储3

etcd存储什么数据的?Etcd源码分析存储3至此,关于Etcd的源码解析,介绍这里基本结束。二、总结 case rd := <-r.Ready(): /* 匿名组合 调用node类对象中Ready方法 */ if rd.SoftState != nil { ... notifyc := make(chan struct{} 1) ap := apply{ entries: rd.CommittedEntries snapshot: rd.Snapshot notifyc: notifyc } //这个地方非常重要,用于通知数据已经应用到节点中,这个地方就是上一篇的输入 updateCommittedIndex(&ap rh) select { case r.applyc <- ap: case <-r.stopped: return } // the le

Etcd是分布式存储系统,当leader有数据变化,要及时更新到其他节点,这里就涉及到数据同步。

一、数据同步

上一篇介绍,Etcd接收到客户端的请求,会把相关数据传递到Raft状态机中,那么进入状态机之后如何处理呢?流程图如下:

etcd存储什么数据的?Etcd源码分析存储3(1)

type raftLog struct { // storage contains all stable entries since the last Snapshot. // 保存自最后一个snapshot之后所有稳定的entries // MemoryStorage storage Storage // unstable contains all unstable entries and snapshot. // they will be saved into storage. // 未提交的entries,最后会写到Storage,即MemoryStore unstable unstable // committed is the highest log position that is known to be in // stable storage on a quorum of nodes. // 最后一次提交的索引 committed uint64 // applied is the highest log position that the application has // been instructed to apply to its state machine. // Invariant: applied <= committed // 表示应用 已经把entry应用到状态机中 最后一个提交索引,applied始终小于等于Committed applied uint64 logger Logger } func (r *raft) appendEntry(es ...pb.Entry) { li := r.raftLog.lastIndex() for i := range es { es[i].Term = r.Term es[i].Index = li 1 uint64(i) } r.raftLog.append(es...) //先将entires 加入到unstable队列中,最后把数据提交到MemoryStorage中 r.prs[r.id].maybeUpdate(r.raftLog.lastIndex()) // Regardless of maybeCommit's return our caller will call bcastAppend. r.maybeCommit() } // 发送广播rpc,将raftLog中的数据发给其他节点,但是这里并没有真正发送到对端 // 只是把消息放到队列中 // bcastAppend sends RPC with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { for id := range r.prs { if id == r.id { continue } r.sendAppend(id) } }

真正发送到对端是在raft.go文件中start方法:

case rd := <-r.Ready(): /* 匿名组合 调用node类对象中Ready方法 */ if rd.SoftState != nil { ... notifyc := make(chan struct{} 1) ap := apply{ entries: rd.CommittedEntries snapshot: rd.Snapshot notifyc: notifyc } //这个地方非常重要,用于通知数据已经应用到节点中,这个地方就是上一篇的输入 updateCommittedIndex(&ap rh) select { case r.applyc <- ap: case <-r.stopped: return } // the leader can write to its disk in parallel with replicating to the followers and them // writing to their disks. // For more details check raft thesis 10.2.1 if islead { /* 如果是leader */ // gofail: var raftBeforeLeaderSend struct{} // 将数据,真正发送到对端 r.transport.Send(r.processMessages(rd.Messages)) } // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState rd.Entries); err != nil { plog.Fatalf("raft save state and entries error: %v" err) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } // gofail: var raftAfterSave struct{} if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v" err) } // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d" rd.Snapshot.Metadata.Index) // gofail: var raftAfterApplySnap struct{} } r.raftStorage.Append(rd.Entries) // MemoryStorage if !islead { /* 如果不是leader */ ... } else { // leader already processed 'MsgSnap' and signaled notifyc <- struct{}{} //通知 }

备注:

这里有一个问题需要跟大家说明(如果我猜测是对的):Raft协议要求,client发送日志给leader,leader需要把数据同步给follower并且收到大部分follower的响应之后,leader在把响应发给client。然而在Etcd中貌似没有这样实现,Etcd是立刻想应该client然后在同步给Follower。也就是说Etcd没有等待Follower响应。为什么要做成这样子呢?

我猜测的原因:Etcd是支持单节点,如果Etcd没有组成集群,那么功能就不能用了吗?显然不是。

二、总结

至此,关于Etcd的源码解析,介绍这里基本结束。

大概花了一个月时间去研读Etcd源码(只能利用零散时间)。下面闲扯一下:

为什么选择Etcd:

1)首先对Raft比较感兴趣,虽然ODL中的集群也是利用Raft协议实现的,但是感觉ODL过于复杂,不适合我。

2)对于分布式存储,兴趣比较浓厚,且在公司搭建DevOps环境时,也用到这个软件。

阅读Etcd收获:

1)Etcd采用go语言编写,有些语法比较奇怪,唯一好的一点,由于C语言基础,阅读起来并不是很费劲。但是基于go语言开发还是需要后期努力。

2)由于Etcd静态存储以及数据同步采用grpc格式数据,这次对于grpc数据格式有了进一步认识(虽然曾经研究过一次)。

3)go语言的一大亮点在于channel,因为channel的存在,能减少多线程并发问题。其实channel对于我来说并陌生,我就是把它理解成linux管道(pipe)。然而channel虽然很便利,但是在Etcd中运用了大量的channel,这使得其复杂度加深。

4)曾经基于libmicrohttpd为项目开发一个http server,自认为对http协议比较了解,然而当阅读到Etcd代码之后,代码实现可以与协议规定不一致(利用http通道处理集群建通信)。

后续学习方向:

1)对于Etcd性能,并没有深入研究。主要原因是项目这边没有数据,没有使用场景(简单场景),也只能通过相关报道继续跟踪。

2)go语言内置http框架,需要进一步研究,需从源码级进行了解。

3)很早之前就知道,grpc报文是无法解析(wireshark),但若能解析呢?

etcd存储什么数据的?Etcd源码分析存储3(2)

猜您喜欢: