Skip to content

Latest commit

 

History

History
1928 lines (1399 loc) · 89.2 KB

05.block-syncing.md

File metadata and controls

1928 lines (1399 loc) · 89.2 KB

区块的同步和下载


在 p2p 的章节中,我们知道以太坊网络上每个 peer 都是一个 devp2p 的实例,peer 之间的数据通过 RLPx 加密通信,在 devp2p 之上就是各种协议,比如

  • Ethereum protocol(eth, les);
  • Swarm protocol(bzz);
  • Whisper(shh);

通过这些协议构建整个 decentralized blockchain 世界,如下图所示(原图见 ethereum-protocols.png

image-20181018150313192

针对 Ethereum 协议,新节点要想加入到网络中,就需要实现 eth/les 协议,接下来我们要讲解如何根据 RLPx 消息协议完成区块的同步工作。

一个 Peer 的节点的协议层分布如下:

image-20180824095200301

类似于 TCP/IP 经典网络栈,我们可以把这个协议栈称为 ETH/IP 网络栈,接下来的内容将讲解 ETH/IP 网络栈的应用层内容,这部分内容将着重关注 Peer 之间如何通过 eth 或者 les 协议互通(以 eth 为主要分析模块)。

P2P 通信的管理模块 ProtocolManager

Ethereum 中,管理 Peer 之间通信的顶层结构体叫 eth.ProtocolManager,它也是 eth.Ethereum 的核心成员变量之一。ProtocolManager 中可以管理多个以太坊的子协议(eth62eth63),其结构体定义如下:

//eth/handler.go
type ProtocolManager struct {
    networkID uint64

    fastSync  uint32
    acceptTxs uint32

    txpool      txPool
    blockchain  *core.BlockChain
    chainconfig *params.ChainConfig
    maxPeers    int

    downloader *downloader.Downloader
    fetcher    *fetcher.Fetcher
    peers      *peerSet

    SubProtocols []p2p.Protocol

    eventMux      *event.TypeMux
    txsCh         chan core.NewTxsEvent
    txsSub        event.Subscription
    minedBlockSub *event.TypeMuxSubscription

    newPeerCh   chan *peer
    txsyncCh    chan *txsync
    quitSync    chan struct{}
    noMorePeers chan struct{}

    wg sync.WaitGroup
}

ProtocolManager 主要成员包括:

  • 几个行为状态开关
    • fastSync: 是否允许 FastSync 模式同步;
    • acceptTxs:是否允许接受其它节点上的 Tx,当节点没有完全同步的时候,无法同步其它节点发送过来的 Tx;
  • peertSet 类型成员用来缓存 Peer 实例列表,Peer 表示网络中的一个远端实例, 其定义如下:
//eth/peer.go
type peer struct {
    id string

    *p2p.Peer
    rw p2p.MsgReadWriter

    version  int
    forkDrop *time.Timer

    head common.Hash
    td   *big.Int
    lock sync.RWMutex

    knownTxs    mapset.Set
    knownBlocks mapset.Set

    queuedTxs   chan []*types.Transaction
    queuedProps chan *propEvent
    queuedAnns  chan *types.Block
    term        chan struct{}
}

可以看出来,一个 peer 结构代表了一个远端节点的所有特征:

  • p2p 层面的请求连接;
  • Ethereum 层面的特性,区块高度,节点上的 Tx,Block 等信息,根据这些信息后续可以决定这个节点的价值;

交易和区块在内的数据更新通过 channel 和 subscription 的方式接收和发送,当然在 geth 内部,也往往利用 channel 来实现 subscription 机制;

  • p2p.Protocol 由 PM 管理的协议及其实现;

  • Downloader 类型成员负责所有向相邻实例主动发起的同步流程;

  • Fetcher 类型成员累积所有其他实例发送过来的有关新数据的公告消息,并与自身数据对比后,安排相应的数据下载请求;

  • 大量的 channel 成员,用于各个线程之间数据传递;

注意:

这里提到的 "远端" 实例,即非本 peer 的其他 peer 对象。以太坊的 P2P 网络中,所有进行通信的两个 peer 都必须率先经过相互的注册协商(支持的协议和版本号是否匹配),并被添加到各自缓存的 peer 列表中,也就是 peerSet 对象中,这样的两个 peer,就可以称为“相邻”。所以,这里提到的 “远端 " 实例,如果处于可通信状态,则必定已经 “相邻”。

前面提到 P2P 架构中,个体之间是平等的,即给其它 peer 提供服务,亦提交工作给其它 peer,反映在 Ethereum 上,PM 即对外主动发布消息,比如发布新区块和交易数据;亦接收其它 peer 的数据请求,比如区块数据、交易数据下载请求,接下来分别看待这两部分内容。

作为 client 主动对外发布消息

Start() 函数是 PM 的启动函数,它会在 eth.Ethereum.Start() 中被主动调用。启动之后,PM 订阅 NewTxsEventNewMinedBlockEvent 这两个事件,同时启动 4 个线程执行不同的事项:

//eth/backend.go
func (s *Ethereum) Start(srvr *p2p.Server) error {
    ..
    s.protocolManager.Start(maxPeers)
    ...
}
//eth/handler.go
func (pm *ProtocolManager) Start(maxPeers int) {
    pm.maxPeers = maxPeers

    pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
    pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
    go pm.txBroadcastLoop()

    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
    go pm.minedBroadcastLoop()

    go pm.syncer()
    go pm.txsyncLoop()
}

以上这四段相对独立的业务流程的逻辑分别是:

pm.txBroadcastLoop()

广播新出现的 Tx,作为转发节点使用。

image-20181018165534907

一旦在 txCh 接收到新出现的 Tx(TODO:由谁发出来的),会立即调用 BroadcastTx() 函数异步广播给那些尚无该交易对象的相邻个体,达到全网交易数据的同步。

pm.minedBroadcastLoop()

广播本地新挖掘出的区块,抢夺矿工收益。

image-20181018170605323

一旦本地有区块被挖掘出来,立即广播给 Peer,很有意思的是,在收到 NewMinedBlockEvent 事件之后,这一线程会连续调用两次 BroadcastBlock(),两次调用仅仅一个 bool 型参数 propagate 不一样,第一次为 true, 第二次为 false:

  • 当该参数为 true 时,会将整个新区块依次发给相邻 peer 中的一小部分($\sqrt{(peers)}$),推测目的是为了让这些 peer 帮助快速验证,进而有机会将新区块快速加入到主链上;
  • 而当其为 false 时,仅仅将新区块的 Hash 值和 Number 发送给所有相邻 Peer。

pm.syncer()

定时与相邻 Peer 进行 BlockChain 级别的强制同步,本地区块高度不小于 Peer。

image-20181018171631615

首先启动 fetcher 成员,然后进入一个无限循环,每次循环中都会向相邻 peer 列表中 “最优” 的那个 peer 作一次区块全链同步,同步的过程就是调用 downloader 成员从 Peer 处不断下载本地缺失的 Block。发起上述同步的触发点分两种:

  1. 如果有新注册(加入)的 peer,则在 peer 列表总数大于 5 时,发起同步;
  2. 以 10s 为间隔定时的强制同步。

这里所谓 "最优" 指的是 peer 中所维护区块链的 TotalDifficulty 最高,由于 TD 是全链中从创世块到最新块的 Difficulty 值总和,所以 TD 值最高就意味着它的区块链是最新的,跟这样的 peer 作区块全链同步,显然数据量是最真实有效的,此即 "最优"。

当接收到 noMorePeers 的消息之后,退出同步流程,这个更多是在本端节点离开了网络之后处理的。

pm.txsyncLoop()

当有新 Peer 连接到本地之后,将本地 txpool 中的 Tx 列表同步给他们(在每个 Peer 成功连接之后)。

image-20181018172645183

负责为每个新连接的 Peer 初始化 transaction 列表,当新的 peer 加入时,转发当前 txpool 中所有的 Tx,同时从工程学上考量,为了最小化出口带宽利用率,一次只将部分 transaction 发送给一个 Peer,Peer 之间有连接发生的时候会再次同步,均化每个节点上的带宽使用率,最终的结果就是所有 Tx 同步到所有的 Peer 上。

这一线程的主体也是一个无限循环,它的逻辑稍微复杂一些:

  1. 首先有新节点连接上来的时候,通过 PM 调用 pm.syncTransactions()pm.txsyncCh 中发送一个 txsync{p, txs}, 包含 peer 和 Tx 列表;
  2. 这个线程从 txsyncCh 中收到新数据,则将它存入一个本地 map 结构的 pending 变量中,key 为 peer.ID,value 为 txsync,开始往 peer 发送本地的数据;
  3. 每次向 peer 发送 tx 对象的字节大小上限为 100KB,如果 txsync 对象中没有剩余 tx,则将 peer 从 pending 变量中删除,将发送结果放入到 done channel 中;
  4. txsync chan 中没有新数据到达,并且上次已经将 tysync 数据发送给 peer 了,则从 pending 对象中随机找出一个 txsync 对象,将其中的 tx 组发送给相应的 peer。

这四个线程就是 ProtocolManager 向相邻 peer 主动发起的通信过程,总结一下:

  • 本端 peer 向其他 peer 主动发起的通信中,按照数据类型可分为两类:
    • 交易 Tx:
    • 对于新建立的连接,将 txpool.Pending() 中的交易信息同步给对方;
    • 本地发送了一个交易,或者是接收到别人发过来的交易信息,txpool 会产生一条消息,消息被传递到 txCh channel;然后通过 pm.txBroadcastLoop() 处理, 发送给其他不知道这个交易的 peer;
    • 区块 block;
    • 如果是自己挖出来的,通过 pm.minedBroadcastLoop() 广播给相邻 peer;
    • pm.syncer() 定时与 BestPeer 同步信息。
  • 而按照通信方式划分,亦可分为广播新的单个数据和同步一组同类型数据。

这样简单的两两配对,便可组成上述四段流程。

上述函数的实现中,很多地方都体现出巧妙的设计:

  • 比如 BroadcastBlock() 中,如果发送区块 block,由于数据量相对较大,那么仅仅发送给一小部分相邻 peer;而如果是发送 Hash + Number,则发给所有相邻 peer;
  • 又比如 txsyncLoop() 中,会从 pending 中随机选择一个 peer 进行发送 (随机选择的 txsync{} 中包含 peer)。

这些细节,很好的控制了单次业务请求的资源消耗对于定向区域的倾向性,使得整个网络资源消耗愈加均衡,体现出非常全面的设计思路。

作为 server 处理其他 peer 的请求

对于 Peer 间通信而言,前面只介绍了己方主动向对方 Peer 通信的场景,另一种就是由对方 Peer 主动发起的请求,这种通信就是 Web 领域大家熟知的服务端的案例,基本原理比较简单,客户端和服务端之间定义一种统一的标准(读写协议),服务端根据客户端的请求发送相应的响应回去。

当 P2P 的 peer 启动的时候,会主动的找节点(bootnode)去连接,也可能同时被其他的节点先连接上来。连接的过程就是:

  1. 首先进行加密信道的握手,
  2. 然后进行 p2p.Protocol 协议的握手,
  3. 最后为每个协议启动 goroutine 执行 p2p.Protocol.Run 方法来把控制权交还给最终的协议。

PM 中的这个 Run 方法首先创建了一个 Peer 对象,然后调用了 handle 方法来处理这个 Peer 后续整个生命周期:

//eth/backend.go
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
    peer := manager.newPeer(int(version), p, rw)
    select {
    case manager.newPeerCh <- peer:
        manager.wg.Add(1)
        defer manager.wg.Done()
        return manager.handle(peer)
    case <-manager.quitSync:
        return p2p.DiscQuitting
    }
}

handle() 的处理流程如下:

image-20181018183508939

handle() 函数针对一个新 Peer 做了如下几件事:

  1. 与 Peer 握手沟通双方的区块链状态,协商版本号、网络 ID、difficulties、head and genesis block;

  2. 初始化一个读写 channel,用以跟对方 Peer 相互数据传输;

  3. 注册对方 Peer,存入己方 peerset 中,只有 handle() 函数退出时,才会将这个 Peer 移除出列表;

    在注册的同时,开启一个线程用于将 Peer 上面的数据发送到远端:

//eth/peer.go
func (p *peer) broadcast() {
    for {
        select {
        case txs := <-p.queuedTxs:
            p.SendTransactions(txs)
        case prop := <-p.queuedProps:
            p.SendNewBlock(prop.block, prop.td)

        case block := <-p.queuedAnns:
            p.SendNewBlockHashes([]common.Hash{block.Hash},
                                 []uint64{block.NumberU64()})
        case <-p.term:
            return
        }
    }
}

前面说 pm 当作 client 的时候,将数据发送给 Peer,实则是把数据放到 Peer 的 channel 中,由 Peer 中另一个线程异步发送出去的。

  1. 把 Peer 注册给 pm.downloader. 如果 downloader 认为这个 peer 被禁用了,那么断开连接。 pm.downloader 会自己维护一个相邻 Peer 列表;

  2. 将本节点上当前 txpool 中累计的所有 transaction 对象同步给对端 Peer;

  3. 如果本节点是 Dao 硬分叉感知的,那么验证 Peer 对于 Dao 硬分叉也是感知的,不然就把 Peer 从 pm 中踢出去;

    为何要验证 peer 也是 Dao 硬分叉(区块高度 1920000)感知的,本质上只要保证本节点和 peer 处于链的同一侧即可,如果两者位于硬分叉之后的同一“侧”链上,那么两端都会有 DaoForkBlock 存在,这下验证就变得简单了,我们只要看下 Peer 这一侧的 DaoForkBlock 和本端是否相同就可以了,因此在基本的协议握手之后需要向 Peer 发起一个 GetBlockHeadersMsg 获取 Peer 的 DaoForkBlock。这个消息的返回可能有下面两种情况:

    1. Peer 当前区块 $&lt;$ 本地的 DaoForkBlock,返回的 headers 为空,也就是 Peer 的区块高度还在分叉点之前,这时需要保证 Peer 当前区块难度 $\le$ DaoForkBlock.TD 即是合理的,也就不用验证了;

    2. Peer 中存在 DaoForkBlock,返回的 headers 不为空,这就更加简单了,只要和本地的 DaoForkBlock 比较是否完全一致即是合理的;

    ETC 上面 1,920,000 高度的区块数据:

    image-20181018182134494

    ETH 上面(当前的主网)1,920,000 高度的区块数据:

    image-20181018182236409

  4. 循环调用 handleMsg() 处理消息,当对方 Peer 发出任何 msg 时,handleMsg() 处理接收到的各种消息,作为相对应的回复或者措施。

handle() 处理结束的时候,将 Peer 从 peerset 中断开连接。

handleMsg() 处理协议消息

经过一系列的检查和握手之后,调用了 pm.handleMsg 方法来处理事件,在这个方法中根据不同的消息类型走对应的处理流程:

//eth/handler.go
func (pm *ProtocolManager) handleMsg(p *peer) error {
    // Read the next message from the remote peer,
    // and ensure it's fully consumed
    msg, err := p.rw.ReadMsg()
    ...
    defer msg.Discard()

    // Handle the message depending on its contents
    switch {
        ...
    }
    return nil
}

P2P 架构相比于 CS 架构一个明显的差别是,P2P 中一个节点即可以是 client,同时又可以是 server,所以会有一些对等协议的出现:有一方发送,就有另一方接收。

从 Peer 处读取完所有数据后匹配消息类型,根据 eth/protocol.go 中定义的消息类型,分别来看下每个消息的内容:

eth62

  1. 0x00 StatusMsg

    > StatusMsg 只存在于 handshake 阶段,错过了 handshake 阶段这个消息就不可能存在了(所以在这里收到了的话,应当返回错误)。

  2. 0x01 NewBlockHashesMsg -> Hash,Number

    > Peer 公告已经有一个新的 Block 被挖出来了,本地节点需要通知 fetcher 有一个潜在的 Block 待下载。

  3. 0x02 TxMsg-> []*types.Transaction

    > 接收 transaction 信息,前面说了 Peer 连接上来之后,会将本节点上 txpool 上的 Tx 同步给 Peer, Peer 在接收 TxMsg 的时候需要确保本端节点已经完成同步了(在 downloader.Synchronise() 已经完成,pm.acceptTxs 开关已经打开),Peer 接收到的 Tx 数据被加入到本端的 remote txpool 中,等待后续打包。

  4. 0x03GetBlockHeadersMsg -> HashOrNumber,Amount,Skip,Reverse

    > 根据指定的 Hash 或 Number 开始获取一批 Block 的 Header,从 blockchain 中取出 header 数据之后,发送回对端。

    这里在注意的是,在检索查询过程中,整数不要溢出了。比较好奇的是,Query 参数中可以指定一个 Skip 参数,表明是否要在连续的区块中跳过一些区块,对端可以同时开多个线程并发同步 Header,每个线程彼此分开起点,这样子加快 Header 的同步耗时,在实际使用中,根据 Hash 获取顺序批量数据的查询方式最为常见,在下方 fetcher 章节中细讲。

  5. 0x04 BlockHeadersMsg-> []*types.Header

    > 为 GetBlockHeadersMsg 消息的回应消息,代表之前请求的一批 header 已经到达了本端,本端接收完所有 headers 之后,传送给 downloder。

  6. 0x05 GetBlockBodiesMsg -> []Hash

    > 与 GetBlockHeadersMsg 类似,根据 Hash 值从 blockchain 中取出一批 Block 之后,发送回对端。

  7. 0x06 BlockBodiesMsg -> []*blockBody

    > 与 BlockHeadersMsg 类似,本端接收所有的 Body 信息,并传送给 downloader。

  8. 0x07 NewBlockMsg -> Block,TD

    > 接收 BlockMsg 消息,将 Block 交给 fetcher 排队处理。

eth63

由于这几个消息都是在 eth63 加入的,需要确保对端的版本和消息的类型是匹配的(p.version >= eth63)。

  1. 0x0d GetNodeDataMsg -> []StateDB TrieNode

    > 注意:这里的 Node 并不是 P2P Node 或者 Ethereum Node,而是 StateDB 中的 Trie Node,用于从远端恢复整棵 StateDB Trie 树,这个是 eth/downloader/statesync.go 中请求的内容,最后调用到 trie.Database.Node(hash) 接口。

  2. 0x0e NodeDataMsg -> [][]byte

    > 本端接收到 StateDB Trie Node 之后传送给 downloader 去处理。

  3. 0x0f GetReceiptsMsg -> []Hash

    > 对端通过 ReceiptHash 请求 Receipt 内容,用于 Fast Sync 同步方式。

  4. 0x10 ReceiptsMsg -> [][]*types.Receipt

    > 本端接收到的 Receipts 数据传送给 downloader 去处理。

downloader 与 fetcher

downloader 与 fetcher 都是用于数据下载的场景,但是两者有本质的区别:

  1. downloader 用于历史区块的下载,而 fetcher 用于新区块的下载(收到 NewBlockMsg);
  2. fetcher 在下载新区块的同时,需要往其它 Peer 转发收到的区块,以使新区块快速被网络接受;
  3. 在接收到 BlockHeadersMsgBlockBodiesMsg 的时候,fetcher 的优先级高,需要先处理,只有 fetcher 无法(不想)处理的消息才由 downloader 完成后续处理;

从这一点上可以看出来 Ethereum 对于细节的把控还是很精确的。

Downloader

在了解同步事宜之前,我们先看下有哪些同步的方式,当前有三种模式:

  • 第一种为 full mode(全量同步模式,也是 geth 客户端最开始的实现方式)

    这种模式通过分开下载 Header 和 Body 来构建 BlockChain,而同步的过程就和普通的 Block 插入的过程一样,包括:

    Header 下载 ➡️ Header 验证 ➡️ Block 下载 ➡️ Transaction 验证 ➡️ Transaction 执行 ➡️ Receipt 生成 ➡️ StateDB 状态变更

    这种方式由于会将每一个 Block 都会在本地 replay 一遍,当 geth 客户端开启了 gcmode=archive 模式的情况下,会把所有的历史数据都保存下来,根据保存下来的数据可以重现每一个 Tx 执行的前后变化情况,在数据分析的过程中非常有用(使用 trace_transaction RPC 接口可以取到完整的每个 Tx 执行情况);同时也带来了一定的劣势,即 Tx 一步步串行执行,当区块高度很大的时候,对应着 Tx 也会特别多,那么这其实是一个非常消耗资源的过程,区块同步的时间也非常长。

  • 第二种为 fast sync 模式(当前的默认模式)

    这是相对省资源的一种方式,fast sync 的模式会下载 Header, Body 和 Receipt,插入的过程不会执行交易,因此也不会产生 StateDB 的数据,然后在某一个区块高度(最高的区块高度 - 1024)的时候同步完成所有的 StateDB 中的数据,作为本节点上的初始 StateDB Root 信息,最后的 downloader.fsMinFullBlocks(当前为 64)个区块会采用 full mode 的方式来构建。这种模式会缩小区块的同步时间(不需要执行交易),同时不会产生大量的历史的 StateDB 信息(也就不会产生大量的磁盘空间),但是对于网络的消耗会更高(因为需要下载 Receipt 和 StateDB),与 full sync 相比较,fast sync 是用网络带宽换取 CPU 资源

    关于 fast sync 更多细节可以查看 go-ethereum#1889

  • 最后一种为 light mode

    这种模式只下载 Header, 一般用于 Light Ethereum,使用场景有限:

    TODO: 补充使用场景

downloader 负责区块数据的同步下载、插入本地区块链的工作,在上一节讲过,pm.syncer 线程会在节点加入网络,或者定时主动同步工作,而同步的过程就是由 downloader.Synchronise() 发起一个同步消息之后,等待 peer 返回 Header, Block, Receipt 等数据,当 pm 接收到回应之后,将数据传送给 downloader 的 channel 中,由相应的 goroutine 通过监听 channel 把从网络上得到的数据载入到本地 BlockChain 中,完成本地和远程的同步。downlower.Downloader 结构体中的成员比较多,这里不单独罗列出来,我们把这一结构体拆分成下面几类成员,分别讲解。

LightChain

注意,这里的 LigthChain 是 downloader 模块自定义的接口类型,用于处理由 Header 组成的 chain 结构中的事宜,在 fast 与 light 同步方法上会被用到,并不是 light/lightchain.go 中定义的 LightChain(后者只要实现了前面的接口定义也是可以的)。其接口要求实现下面的方法:

//eth/downloader/downloader.go
type LightChain interface {
    // HasHeader verifies a header's presence in the local chain.
    HasHeader(common.Hash, uint64) bool

    // GetHeaderByHash retrieves a header from the local chain.
    GetHeaderByHash(common.Hash) *types.Header

    // CurrentHeader retrieves the head header from the local chain.
    CurrentHeader() *types.Header

    // GetTd returns the total difficulty of a local block.
    GetTd(common.Hash, uint64) *big.Int

    // InsertHeaderChain inserts a batch of headers into the local chain.
    InsertHeaderChain([]*types.Header, int) (int, error)

    // Rollback removes a few recently added elements
    // from the local chain.
    Rollback([]common.Hash)
}

可见,都是针对 Header 的行为方法,具体见上面的代码注释。

BlockChain

同 LightChain, 也是 downloader 模块定义的一个接口,用于 full 与 fast 同步模块,这一接口“继承”自 LightChain, 在此之上又实现了另外也个方法:

type BlockChain interface {
    LightChain

    // HasBlock verifies a block's presence in the local chain.
    HasBlock(common.Hash, uint64) bool

    // GetBlockByHash retrieves a block from the local chain.
    GetBlockByHash(common.Hash) *types.Block

    // CurrentBlock retrieves the head block from the local chain.
    CurrentBlock() *types.Block

    // CurrentFastBlock retrieves the head fast block
    // from the local chain.
    CurrentFastBlock() *types.Block

    // FastSyncCommitHead directly commits the head block
    // to a certain entity.
    FastSyncCommitHead(common.Hash) error

    // InsertChain inserts a batch of blocks into the local chain.
    InsertChain(types.Blocks) (int, error)

    // InsertReceiptChain inserts a batch of receipts
    // into the local chain.
    InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
}

可见,在 Header 的基础上增加了 Block, Receipt 的属性方法。另外,针对 fast sync 模式,有几个单独的接口:

  1. FastSyncCommitHead 用于保存 fast sync 与 full sync 分界点处的那一个 block,需要确保在这一个区块上的 StateDB 数据是完整的,不然后面走 full sync 就有问题了;
  2. InsertReceiptChain 用于保存 Block 与其对应的 Receipt;

channel 类

从 downloader 的角度出发,用于接收传递数据的 channel 有:

channel msg.Code note
headerCh BlockHeadersMsg 区块头,也是同步的开始
headerProcCh Header 下载之后,用于推进下一步的操作
bodyCh BlockBodiesMsg 包括了 Tx 和 Uncle 数据
receiptCh ReceiptsMsg 包括了 Receipt 数据,fast sync 中用到
stateCh NodeDataMsg StateDB Trie 中的数据,用于同步 StateDB

注意: NodeDataMsg 是在 eth63 协议中加入的,原先是不支持的(因为原先只支持 full sync, 而 StateDB 的数据是一个个区块执行后累积生成的,因此没有 State 同步的需求),而 downloader 中对于 StateDB 的数据也是单独同步处理的,因为这个数据不需要全量保存,只要拥有最近的数据即可。

另外,还有几个 channel 用于通知中止任务:

channel note
bodyWakeCh 通知 fetchParts,Block 全部下载完了,可以退出
receiptWakeCh 通知 fetchParts,Receipt 全部下载完了,可以退出

由于这几种同步方式都是先从 Header 下载开始,当 Header 下载验证完成了,需要通知可以开始下载 Block 和 Receipt 了;当没有更多的 Header 出现时,需要通知 Block 和 Receipt 可以停止下载了,downloader 中用了两个 bool channel 用于控制这一迭代式的下载过程,也就是上面的 bodyWakeChreceiptWakeCh

Synchronise 同步下载

Synchronise() 试图与一个 peer 根据 Hash 和 Total Difficulty 来发生同步,如果是 fast sync 的模式,并且 Peer 比本端的 td 要小,说明本地比 peer 要新,那就没有必要再和他同步了;而如果同步过程中遇到一些错误(当前超时了,Peer 无效等),那就删除掉 Peer, 等待下一次找其它节点同步。

//eth/downloader/downloader.go
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int,
                                 mode SyncMode) error {
    // 1. ensure only one synchronising goroutine is running
    if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { return errBusy }
    // 2. reset the queue
    d.queue.Reset()
    d.peers.Reset()
    // 3. clear d.bodyWakeCh, d.receiptWakeCh
    // 4. clear d.headerCh, d.bodyCh, d.receiptCh, d.headerProcCh
    defer d.Cancel()
    ...
    // 5. sync peer with peer's hash, td
    return d.syncWithPeer(p, hash, td)
}

真正发生同步事宜:

//eth/downloader/downloader.go
func (d *Downloader) syncWithPeer(p *peerConnection,
                                  hash common.Hash, td *big.Int)
(err error) {
    ...
    latest, err := d.fetchHeight(p)

    height := latest.Number.Uint64()
    // findAncestor to find the common ancestor
    // between self and peer
    origin, err := d.findAncestor(p, height)

    d.syncStatsLock.Lock()
    if d.syncStatsChainHeight <= origin ||
        d.syncStatsChainOrigin > origin {
        d.syncStatsChainOrigin = origin
    }
    d.syncStatsChainHeight = height
    d.syncStatsLock.Unlock()

    // Ensure our origin point is below any fast sync pivot point
    pivot := uint64(0)
    if d.mode == FastSync {
        if height <= uint64(fsMinFullBlocks) {
            origin = 0
        } else {
            pivot = height - uint64(fsMinFullBlocks)
            if pivot <= origin {
                origin = pivot - 1
            }
        }
    }
    d.committed = 1
    if d.mode == FastSync && pivot != 0 {
        d.committed = 0
    }

    // Initiate the sync using a concurrent header and content retrieval algorithm
    d.queue.Prepare(origin+1, d.mode)
    if d.syncInitHook != nil {
        d.syncInitHook(origin, height)
    }

    fetchers := []func() error{
        func() error { return d.fetchHeaders(p, origin+1, pivot) },
        func() error { return d.fetchBodies(origin + 1) },
        func() error { return d.fetchReceipts(origin + 1) },
        func() error { return d.processHeaders(origin+1, pivot, td) },
    }
    if d.mode == FastSync {
        fetchers = append(fetchers, func() error {
            return d.processFastSyncContent(latest)
        })
    } else if d.mode == FullSync {
        fetchers = append(fetchers, d.processFullSyncContent)
    }
    return d.spawnSync(fetchers)
}
  1. 获取 Peer 的最高区块(记作 PH);

  2. 从本节点当前的当前区块高度和 Peer 的最高区块(PH)中找到共同的祖先区块(记作 LH),然后从共同祖先到 Peer 当前最高区块之间的区块都是待同步的区块;

  3. 确定同步的起点和高度,用于状态监控:

    • 监控的已知最高区块高度设为 PH;
    • 监控的起始当前区块高度设为祖先区块(如果当前已知区块高度 $\le$ 祖先区块,或者当前最高区块高度 $&gt;$ 祖先区块)
  4. 当节点以 fast sync 模式同步的时候,需要保证最后的 fsMinFullBlocks(64) 个区块是走 full sync 模式同步,那么 fast sync 同步的区块起始点选择就有差别了:

    • $PH \le fsMinFullBlocks$:这种情况下起点就需要从 genesis 区块开始同步,相对来说数据量也不大,从创世区块开始同步也没什么问题;
    • $PH - LH \le fsMinFullBlocks$: 这种情况下说明本节点上的数据和 Peer 基本上是一致的,这种情况多发生在本节点故障重启之后的同步,而不是空节点的同步。由于需要保证最后的 fsMinFullBlocks 是走 full sync 的方式,所以同步起点为:$PH - fsMinFullBlocks$,相当于本地区块回退了一部分之后,走 full sync;

    需要注意的是,虽然这里起点的位置确定了(固定),但是结束的位置虽然这里选择了 $PH - fsMinFullBlocks$,但是在同步过程中,Peer 的区块也在增长,所以这个结束位置也是在不断增长的,后期需要调整!

  5. 启用多个线程,分别用于获取 Header, Body, Receipt,另外有线程分别用于处理 full sync 和 fast sync:

    1. fetchHeaders:不断重复地从 peer 获取 GetBlockHeadersMsg 请求,并将每次收到的消息发送到 headerProcCh 中。

    为了提高并发性,同时能够防止恶意节点发送错误的 header 过来,使用我们正在同步的 “origin” peer 构造一个 HeaderChain Skeleton,并使用其他 peer 填充其中缺失的 header。可以认为只有其他 peer 的 header 能够填充到骨架上时才是安全可靠的,可被接受的。如果没有人能够填充骨架 - 甚至 origin peer 也不能填充的话 - 它被认为是无效的,并且 origin peer 也被丢弃。

    1. processHeaders: 这个线程可以认为是一个带过滤功能的分发器,从 headerProcCh 中获取到 header 信息,经过一系列的验证检查之后将合格的 header 丢入到 downloader.queue 队列中,供后续的 fetchBodiesfetchReceipts 线程从队列中拉取任务。

    这里并没有直接使用 channel 来传递任务,而是用了通用的 priority queue 模型来处理,这样子对于并发控制,优先级任务等都可以实现了,另外也是由于这一结构的特殊性,在讲解 processFastSyncContent 的时候再行讲解。

    1. fetchBodies: 通过 fetchParts() 持续的下载 Block Body 的信息。

    2. fetchReceipts: 与 fetchBodies 类似,也是通过 fetchParts 持续的下载 Receipt 的信息。

    3. processFastSyncContent: 对于 fast sync 的模式下,从 queue 中取到 Block 和 Receipt 之后,分别插入到 BlockChain 和 Receipt 中。

    4. processFullSyncContent: 对于 full sync 的模式下,从 queue 中取到 Block 之后,将之插入到 BlockChain 中。

接下来,我们来细细分析每个线程的工作,首先看下同步起点的选择这一问题。

findAncestor 查找祖先节点

通常情况下,如果本地节点是经常同步在主网上的,那么只需要查看最新的 N 个区块是否一致就可以找到共同祖先;但是也有一些极端情况,可能两者的差距非常大,这种情况下可以使用二分法快速找到一个共同祖先。

我们假定下面几个变量:

  1. $S$: 待查找的共同祖先;
  2. $LH$: 本地的当前区块高度(取値与同步模式有关);
  3. $PH$: Peer 当前区块高度;

查找祖先的过程可以简化为以下几个步骤:

  1. 首先确定本次查找的上下边界:$S \in [max(0, LH - 3 * PoWEpoch), min(LH, PH)]$;

    • 如果 $LH \ge 3 * PoWEpoch$ (90000),这种情况下,不需要再从 0 开始同步,下边界可以往上移到 $LH - 90000$
  2. 优先考虑从最新的 Top N 个区块开始查找,查找起点为 $min(LH, PH)-MaxHeaderFetch$,区块间隔设置为 15,最多查找 $2 * MaxHeaderFetch / 16$ 个区块。其中 $MaxHeaderFetch$ 为 192,也就是说获取本地和 Peer 区块中高度较低者中最新的 192 个区块,中间间隔 15,最多获取 24 个区块;

    上面判断 Peer 区块比本地的高,用的是 TD 来判断的,虽然说 TD 比本地高对应的区块 Number 也应该比本地的高,但是在查找祖先的时候,用的是 Number 来判断,这样子这个函数的入口就有可能会是 Peer Number 比本地的还要低,所以要取 $min(LH, PH)$,但是这样子的话,对于上限 24 是否就没有必要设置了呢?其实不然,如果说当本地链比 Peer 要高,Peer 同时也在同步中,这个时候你去 fetch Peer 一些高度的话,那么我们多取一点也没有问题,保证链是完整的。

    TODO:哪种场景下会出现?

  3. 从返回的 Top N 个区块从后往前查找,如果在本地的 BlockChain 中包含有这一区块,那就说明这个区块就是我们要查找的祖先,直接退出即可;

  4. 如果没有找到,那就说明两端的区块相差比较大,那就需要走二分查找法找到共同祖先,以上面确定下面的上下边界为基础,不断缩小范围查找,以下为二分查找伪代码,与 findAncestor 中的实现类似

start, end := floor, head

for start+1 < end {
    check := (start + end) / 2
    headers := peer.requestHeaderByNumber(check, 1, 0, false)
    h, n := headers[0].Hash(), headers[0].Number.Uint64()
    if (!localBlockChain.hasBlock(h, n)) {
        end = check
        continue
    }
    start = check
}

if start <= floor {
     // not found
} else {
     return start
}

以 start, end 中间的位置在 Peer 中作切割查找:

  1. 如果本地区块链上不存在这一 Block,说明这个位置太大了,将上边界下移到中间位置;
  2. 不然就说明这一个 Block 已经合适了,再往后查找一个区块高度更高的,将下边界上移到中间位置。

开始区块同步流程

同步的过程通过 Golang channel 与 prority queue 实现的一套数据 pipeline 的流程,从数据角度可以分为两类:

  1. 获取数据的过程,这一过程主要是 fetchXXX 类接口的调用,分别获取所需的数据;
  2. 导入数据的过程,这一过程主要是 processXXXSync 接口的调用。

fetchXXX 接口

XXX 对应的 header, body 与 receipt, 由多个 goroutine 分别控制,其流程如下:

image-20181024181505916

fetchHeaders 接口

dataflow:

  1. <- d.headerCh
  2. -> d.headerProcCh

fetchHeaders 是一切同步的基础,根据请求的 Number 不断并发地读取 Header,直到所有的 Header 返回为止,同时内置了流量控制器(由 queue 来实现),用于缓和流量峰值。在提高并发量的同时,为了避免恶意节点通过发送错误 Header 的攻击,fetchHeaders 内部使用 origin Peer 构造了一种名为 skeleton 的 Header chain:

image-20181021212804308

这一 chain 的特点是:

  1. chain 链长度最大为 128;
  2. header 之间不连续,固定间隔为 192;

当这一 skeleton chain 下载完成之后,使用本节点上的 Peerset 列表并发下载 chain 之间空缺的 Header;当一个 skeleton 上所有的 Header 都下载完毕之后,再请求 origin Peer 完成下一个 skeleton 下载。

注意:在请求 skeleton chain 的时候,请求的起点是 from + 192 - 1,因此取到的数据也是 from 往后开始的数据,先记住这里有一个 192 的差异。

processHeaders 接口

dataflow:

  1. <-d.headerProcCh
  2. -> d.queue

监听 d.headerProcCh ,将接收到的 Headers,根据同步模式做不同的处理:

  1. 对于 fast 和 light 同步模式下,将获取到的 Header 保存到 LightChain 中,在保存的时候,内部会先检查 Header 的有效性。另外,需要注意的是, eth 协议中 LightChain 就是 blockchain:
//core/handler.go
func NewProtocolManager(...,
 blockchain *core.BlockChain, chaindb ethdb.Database)
(*ProtocolManager, error) {
 ...
    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    ...
}

//eth/downloader/downloader.go
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
    if lightchain == nil {
        lightchain = chain
    }
 ...
}
 **注意**在插入过程中如果出错了需要 rollback这里的 rollback 机制设计比较奇巧1. 首先rollback 是用 `defer` 实现的也就是在函数调用结束的时候被调用到
 2. 其次当一次 `InsertHeaderChain` 返回失败的时候直接 returnrollback 被执行),这是比较直接的一种场景3. 另外还有一种不太直接的场景验证本地区块链是否真的已经被更新了1. 将所有本地还没有出现过的 Header 都扔到一个 unknown 列表中2. 保留最后的 fsHeaderSafetyNet2048 unknown header被认为超过 2048  Header 已经足够安全了**必定已经在本地链上了不然前面肯定已经出错了**4. 在最后一次接收到一个空的 headers 列表的时候认为 Header 已经全部同步完成了那么判断本地当前的区块 td 是否已经不低于与  Peer 同步开始时的 td 如果是这样子的话那就把 rollback 列表清空也就不需要 rollback 不然在 `defer` 被调用的时候执行 `Rollback` 操作
  1. 对于 fast 和 full 同步模式下,将任务(Header)丢入到 downloader.queue 队列中,供后续做 block/receipt 调度下载(后文在 queue 模块中具体讲解);对于 light 模式来说,同步已经完成。

fetchParts 接口

dataflow:

  1. <-d.bodyCh && <- d.receiptCh
  2. d.queue.Deliver()

可以把 fetchParts 看作是数据下载的一个框架,提供了调度、流控、超时等机制,可用于不断迭代下载 Block 中的不同种类数据,对于具体类型数据的处理通过回调函数来实现。

processFullSyncContent

dataflow:

  1. d.queue.Results()

处理 full sync 模式下面区块数据的保存工作,从 downloader.queue 队列中取出数据,然后组装成完整的 Block 数据(包括 Tx, Uncle 和 Header 数据),调用 downloader.importBlockResults 执行 InsertChain,最终将 Block 写入到区块链上(也就是底层 LevelDB 中),前方讲到,downloader 模块中的 blockchain 为接口,这里实际上调用的是 core/blockchain.go 中定义的真实 BlockChain 结构,看下这里 insert 的具体过程:(TODO:补充流程图)

  1. 验证待 insert 的每一个区块是完整连续的,注意,这儿并没有验证这一段区块与 current block 也是连续的,那么这个验证在哪儿执行?还记得前文在讲解 blockchain reorg 的过程吗,那里会有这一“限制”;
  2. 调用共识算法验证待 insert 的每一个区块 Header;
  3. 如果区块 Header 验证通过,再调用 BlockChain Validator(core.BlockValidator) 验证每一个区块 Body(validator.ValidateBody);
  4. 对于验证通过的区块,调用 BlockChain Processor(core.StateProcessor) 生成 Receipt 和 Log,usedGas;
  5. 将上面生成的 Receipt,usedGas,以及变更了的 State 数据与 Block 数据进行验证(validator.ValidateState);
  6. 到此为止,说明这一段区块数据是完整了的,那么就可以插入到底层 chain 之中,调用 BlockChain.WriteBlockWithState 将 Block, Receipt, StateDB 持久化到 LevelDB 之中。

processFastSyncContent

dataflow:

  1. d.queue.Results()

与 full sync 相比,fast sync 虽然需要执行的事项少了,但是在插入的过程会复杂一些,首先体现在,这一函数需要传递 Peer 当前的区块 latest 参数进来,这一参数将直接决定 fast sync 的区块最高值($latest.Number - 64$),也就是 pivot block,从 queue 中接收到一串 Block 之后,比较这一 block 与 pivot block 的关系,在 pivot 之前直接插入 block 和 receipt, 后面的就需要和 full sync 一样执行 block 生成 receipt 之后再保存,其具体流程如下:

  1. 在没有提交过 fastBlock 的情况下(d.commited == 0),如果接收到的 Block 高度比当前的 pivot block 要高 2 * 64,那么更新 pivot 区块高度,很明显,这是在快要同步完成的时候发生的;
  2. 将收到的连续 Block 与 pivot block 比较
    1. pivot 前面的部分执行 downloader.commitFastSyncData 直接保存 Block 和 Receipt;
    2. 如果说 pivot 刚好在这一串 Block 之中,那么我们需要等待这一个 Block 所在的 StateDB Trie 数据已经全部同步好了,之后调用 downloader.commitPivotBlock 将 pivot 区块写入,现时置 d.commited = 1
    3. pivot 后面的部分与 full sync 一样,执行 downloader.importBlockResults

downloader.queue 调度、限流、pipeline 与高并发

eth/downloader/queue.go 提供了一种特殊的优先级队列,同时支持数据 pipeline,这一队列用于同步过程中最后的 ProcessXXXSyncContent 的阶段,为何一般的队列不能实现?原因大概有下面几点:

  1. 关于优先级的问题,在任务处理失败的时候(比如 Peer 掉线了),那么需要重新调度任务,一般的队列缺失优先级功能,不达标;
  2. 关于有序的问题,为提高消息处理能力,需要从多个 Peer 并发下载的能力,但是这样子直接下载的数据并不是按 number 有序排列的,由于 insert chain 必须从小到大有序,因此这也是这个优先级队列的第一重职责,以 number 负值为优先级的队列,在任务调度的时候优先处理 number 比较小的任务,在 pop 出队列的时候严格按照 number 排列(TODO: fast sync 中并不会执行交易,这个有序性是否可以弱化?);
  3. 关于任务重组的功能,这也是关键部分,这个队列中提供的主要功能是任务重组,由于 ProcessFastSyncContent 中需要同时消费 Block 与 Receipt 数据,而这两者又是分开下载的,必须两者同时存在的前提下,消费(insert chain)才有意义。

基于 Header 给予 downloader 在下载 Block/Receipt 时提供了调度和限流的功能,因此 queue 结构体分为三大类成员:

Header 相关

type queue struct {
    headerHead      common.Hash
    headerTaskPool  map[uint64]*types.Header
    headerTaskQueue *prque.Prque
    headerPeerMiss  map[string]map[uint64]struct{}
    headerPendPool  map[string]*fetchRequest
    headerResults   []*types.Header
    headerProced    int
    headerOffset    uint64
    headerContCh    chan bool
}

其中,

  • Queue 中存放以 number 为 key,以 number 负值为优先级的任务队列,也就是 number 越小,优先级越高;
  • Pool 中存放任务信息,有几种 Pool:
    • TaskPool: 以 number 为 key 的 Header 任务;
    • PendPool: 存放 peer id 为 key 的 request 请求体,避免下次收到非请节点发起的请求,在收到请求之后清除;

这儿的其它几个成员先不介绍,等到 DeliverHeaders 的时候再讲使用场景。

Block 相关

type queue struct {
    blockTaskPool  map[common.Hash]*types.Header
    blockTaskQueue *prque.Prque
    blockPendPool  map[string]*fetchRequest
    blockDonePool  map[common.Hash]struct{}
}

与 Header 相比,Block 相关的属性比较简单,但是 Queue 和 Pool 的使用稍微有一点差异:

  • Queue 中存放以 header hash 为 key,以 header number 负值为优化级的任务队列;
  • 一共有 3 个 Pool:
    • TaskPool:以 header hash 为 key 的 Header 任务;
    • PendPool: 存放 peer id 为 key 的 request 请求体;
    • DonePool: 实际上是一个 set 结构,以 header hash 为 key,保存已经完成的任务;

Receipt 相关

type queue struct {
    receiptTaskPool  map[common.Hash]*types.Header
    receiptTaskQueue *prque.Prque
    receiptPendPool  map[string]*fetchRequest
    receiptDonePool  map[common.Hash]struct{}
}

与 Block 相似,不续述了。

除此之外,还有几个成员:

type queue struct {
    resultCache  []*fetchResult
    resultOffset uint64
    resultSize   common.StorageSize

    lock   *sync.Mutex
    active *sync.Cond
    closed bool
}

其中,

active 是持有 lock Mutex 锁的条件变量,与 Mutex 锁机制用来保证在同一时刻仅有一个线程访问某一个共享数据不同的是,条件变量用于在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程可以访问数据了。

  • resultCache 用于保存已经完成所有过程的任务,也是 Results 接口的数据来源,这一结构体在 Reserve 的章节详细讲解;
  • resultOffset 用于保存当前已经被业务层取回的区块 number,在 Results 接口被调用的时候被更新。

下面从 queue 的使用角度来看 downloader.queue 模块是如何提供服务的:

  • push: ScheduleSkeleton/Schedule 接口将 Header 入队列,后续基于队列中的 Header 作调度;

  • reconstruct: 这是一个复杂的过程,在上面 push 之后,队列中有了任务,此时 peer 开始申请下载各种数据

    • ReserveXXX 领取任务,由 downloader 里面的线程来执行;
    • DeliverXXX 接口重组 Block 与 Receipt;
  • pop: Results 从队列中返回当前已经完成的任务数据;

  • sequence:queue 中的数据必须按照 FIFO 的顺序有序返回;

注:从某种意义上来说,reconstruct 的过程也是 push 的一部分,这一过程至关重要,reconstruct 的结果直接影响到这个 Peer 的可用性。

中间还有一些对任务的额外控制:

  • ExpireXXX 用来控制任务是否超时;
  • CancelXXX 用来取消任务。

PUSH

ScheduleSkeleton 接口 -- PUSH Header 任务

ScheduleSkeleton 函数内部实现非常简单,将一串等间隔的 headers []*types.Header 插入到 header 任务池子中就完事了:

//eth/downloader/queue.go
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
    ...
    for i, header := range skeleton {
        index := from + uint64(i*MaxHeaderFetch)

        q.headerTaskPool[index] = header
        q.headerTaskQueue.Push(index, -int64(index))
    }
}

Pool 中保存的是与 index 相差 192 个区块的 Header。

Schedule 接口 -- PUSH Block/Receipt 任务

//eth/downloader/queue.go
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
    for _, header := range headers {
        hash := header.Hash()
        ...
        q.blockTaskPool[hash] = header
        q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))

        if q.mode == FastSync {
            q.receiptTaskPool[hash] = header
            q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
        }
    }
    ...
}

Schedule 申请对区块头进行 block 和 receipt 的下载调度(这里只是申请了下载而已,真正的下载操作在后续完成),在入队列的时候做了严格的合法性检查:

  1. 待入队列的区块 Number 必须是连续的,且开始于请求的区块高度上;
  2. 待入队列的区块的 parent hash 必须是连续不断的;
  3. 一旦任务已经在 blockTaskPool/receiptTaskPool 中了,说明是重叠了,跳过接下来的 push 操作; 由于检查的时候 from 是根据 push 到 queue 中的数量递增的,那么这里出现重叠只能是这一串 headers 中间连续出现了多个重复的 header 的情景;如果是两次请求中的数据重叠了,这种情况也是无法处理的。
  4. 以区块 number 负值为优先级 push 到 blockTaskQueue 中,同时保存 header 到 blockTaskPool 中;
  5. 如果是 fast sync 模式,同理,以区块 number 负值为优先级 push 到 receiptTaskQueue 中,同时保存 header 到 receiptTaskPool 中

上面两个 PUSH 的差异点在于:

  • ScheduleSkeleton 中保存的是不连续的 Header 任务;
  • Schedule 中保存的是连续的 Header 任务。

Reconstruct

Reconstruct 的过程可分为两步执行:

  1. Reserve 过程:Peer 申请下载某一个 Header(s) 对应的数据,将这一任务添加到 pendPool 中以作监控;
  2. Deliver 过程:把从 Peer 接收到的数据通过与 pendPool 中的请求作比对后,保存到 resultCache 之中。

Reserve 接口

ReserveHeaders

这一接口的实现比较简单,用于“预定”一个 Header,过程如下:

  1. 判断这个 peer 是否正在请求中,即位于 headerPendPool 之中,如果已经在请求中了,直接返回(让业务方再再尝试其它的 peer);

  2. 从 headerTaskQueue 中找到一个优先级最高的 number

    1. 如果这个 number 处于 headerPeerMiss 之中(在 Deliver 中讲解),说明这个 number 对于当前 peer 无效,继续 pop;
    2. 如果这个 number 是可用的,直接分配给 peer 构成一个 request;
  3. 对于无效的 number 需要重新导回到 headerTaskQueue 之中;

  4. 将这一 request 添加到 headerPendPool 中以作监控,然后将 request 返回给调用者。

ReserveBodies/Receipts

ReserveXXX 接口与 ReserveHeaders 基本流程是相似的,不同点在于:

  1. 这儿需要利用到 fetchRequest.Pending 字段,这一字段表示这一个 request 当前还需要等待的响应次数,对于 full sync 其值为 1,fast sync 则为 2;

  2. 这里多了一个 resultCache 结构,用来保存当前正在发出请求或者已经收到响应但还没有被消费的 request;

  3. 为每一个待请求的 Body/Receipt 在 resultCache 中增加一个以 header.Number - queue.resultOffset 为索引, request 为 value 的值,在下次接收到这一请求的消息响应的时候,将 request 结构体填充完整;

type resultCache []*fetchResult

TODO:resultCache 是一个 slice 结构,考虑下为何这儿要用这个值为索引,而不是直接往 slice 之中 append?

  1. ReserveHeaders 请求实质上是一个个 Header 请求,但是 ReserverXXX 可能是多个 Header 一起请求,由传入的参数以及 queue 中当前可用空间来共同决定,queue 也正是使用这一机制来控制流量吞吐率,在后文流量控制章节具体讲解;

  2. 如果遇到 Header 中显示这是一个空的 Block 或者 Receipt,也就是这一个区块中没有 Tx,也就意味着不需要去 Peer 下载 Block 和 Receipt,那么可以通知业务方现在已经有数据可供之使用了(可以插入到底层 chain 上了)。

DeliverHeaders 接口

image-20181022160522026

DeliverHeaders 接口在 fillHeaderSkeleton 的时候被调用到,用来把从 Peer 返回的 Skeleton 中间一整段 Headers 插入到 Skeleton 骨架上,这一整段 Headers 或者全部接收,或者全部被拒绝,无论接收与否将当前任务从 headerPendPool 中删除,表示这一任务已经完成了;如果这一段 Headers 被接收了,当前可用的 Headers(可能和刚刚接收的不完全一致) 扔到 headerProcCh 中,由此 downloader 可以调度处理 Block/Receipt 的任务,完整的调用过程如下图所示:

image-20181022174538420

DeliverBody/Receipt 接口

在了解 Block Receipt 被传送之前,先看下从 Peer 处获取到的数据的格式 fetchResult

//eth/downloader/queue.go
type fetchResult struct {
    Pending int
    Hash    common.Hash

    Header       *types.Header
    Uncles       []*types.Header
    Transactions types.Transactions
    Receipts     types.Receipts
}

其中,UnclesTransactions 一起通过 BlockBodiesMsg 消息返回,而 Receipts 通过 ReceiptsMsg 返回,因此 DeliverXXX 接口的用途就是把这两个不同消息的数据组合成一个消息。这个方法被调用的时候需要保证任务是由 ReserverXXX 发起的(即已经在 pendPool 中了),随后会将任务从 TaskPool 转移到 DonePool 中,同时通过条件变量唤醒正在等待数据的其它某一个线程(Results 的调用者)。

与 ReserveXXX 相似,在接收到 BlockBodiesMsg 或者是 ReceiptsMsg 都需要将 fetchRequest.Pending--, 当 fetchRequest.Pending 为零时,说明这一任务已经完成了,可用于数据流下一步消息处理了。

任务重排

queue 中提供了两个接口胜于重排任务,一个是重新编排已经超时的任务,还有一个是取消并重新开始任务。

ExpireXXX 接口

ExpireXXX 接口将过期任务从 pendPool 中移回到 taskQueue 中,下次重新新任务调度出去。注意,这儿不是将某一个任务 expire, 而是定期主动检查,对于超时任务定时重置。当前的使用场景是:

  1. 在每一次接收到需要下载的任务时,先检查一遍当前 pending 的任务中是否已经超时太多了的,

关于过期时间的计算为:

// eth/downloader/downloader.go
var ttlScaling = 3
var ttlLimit   = time.Minute
func (d *Downloader) requestTTL() time.Duration {
    var (
        rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
        conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
    )
    ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
    if ttl > ttlLimit {
        ttl = ttlLimit
    }
    return ttl
}

因此:我们认为一个请求超过了最长 1 分钟的都是过于陈旧的请求,应当被 expire。

CancelXXX 接口

CancelXXX 接口取消已经分配的某一个任务,由于取消的时候是指定 Header/Body/Receipt,直接将任务从对应的 pendPool 中重新加入到 taskQueue 中。

另外说一下,CancelXXX 接口在最早的 v1.2.x 版本中有使用,在 fetch 出问题的时候把这个 request 直接 cancel 掉,但是后面在 go-ethereum/downloader.go at 5b0ee8ec304663898073b7a4c659e1def23716df · ethereum/go-ethereum 的时候对于出错的 fetch 直接 panic 退出:

if err := fetch(peer, request); err != nil {
    // Although we could try and make an attempt to fix this, this error really
    // means that we've double allocated a fetch task to a peer. If that is the
    // case, the internal state of the downloader and the queue is very wrong so
    // better hard crash and note the error instead of silently accumulating into
    // a much bigger issue.
    panic(fmt.Sprintf("%v: %s fetch assignment failed, hard panic",
                      peer, strings.ToLower(kind)))
    cancel(request) // noop for now
}

POP

队列 POP 的过程就是将前面下载完成了的任务提交给调用者的一个过程,即将 resultCache 中 request.Pending 为 0 的元素依照 number 从小到大返回。

  1. 遍历 resultCache 找到最前面的 Pending 为 0 的 request;

    • 若没有找到且参数中指定 wait 状态,当前线程让出 CPU,等待有可用的 request 出现(使用条件变量 active 的通知机制);
    • 其它情况下没有找到的话,直接返回;
  2. 根据找到的 request, 将与之对应的任务从 blockDonePool 与 receiptDonePool 中删除;

  3. 将 resultCache 中剩下的 requst 移到 slice 的最前面,保证最前面的 request 会被最前取回;

  4. 更新 resultOffset 的值:queue.resultOffset += uint64(nproc),由此可见,resultOffset 是一个不够往后走的游标,标志着 resultCache[0] 的 number 值,因此等到下一次有 ReserveXXX 的时候,将 reserve.number - queue.resultoffset 对应为 resultCache 中的索引位置,见下图所示:

    image-20181024010239674

queue 工作流总结

其完整的工作流程如下图所示:

image-20181023113306565

  1. Prepare 中将当前待同步区块高度传入 queue 之中,设置为 queue.resultOffset;
  2. ReserveXXX 中使用 header.Number - queue.resultOffset 为索引保存 request 到 resultCache 之中;
  3. DeliverXXX 中使用 header.Number - queue.resultOffset 为索引更新 request 的内容;
  4. Results 中将 resultCache 中的内容有序弹出给调用者。

流量控制

主要取决于网络带宽使用情况,比如 ReserveBodies 的数量计算方法:

//eth/downloader/peer.go
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
    p.lock.RLock()
    defer p.lock.RUnlock()

    speed := p.blockThroughput*float64(targetRTT)/float64(time.Second)
    return int(math.Min(1+math.Max(1, speed),  float64(MaxBlockFetch)))
}

其中 throughput 的计算分为两部分:

初始 throughput

在 Peer 被 Register 的时候,设置 Peer 的各个维度的 throughput, 设置规则如下:

$$ newPeer.througput = \frac{\sum_{i=0}^{len(peers)-1}peers[i].throughput}{len(peers)} $$

初始值设置为之前所有 Peer 的算术平均值。

调整 throughput

在 fetchXXX 的过程中,有两种可能会调整 throughput, 通过设置 setIdle 来实现,一种是在每次 deliver 数据之后,另一种是在 expire 的时候:

//eth/downloader/downloader.go
func (d *Downloader) fetchParts(...) error {
    for {
        select {
        case packet := deliveryCh:
            if peer := d.peers.Peer(packet.PeerId()); peer != nil {
                accepted, err := deliver(packet)
                if err != errInvalidChain && err != errStaleDelivery {
                    setIdle(peer, accepted)
                }
            }
        case <-update:
            for pid, fails := range expire() {
                if peer := d.peers.Peer(pid); peer != nil {
                    if fails > 2 {
                        setIdle(peer, 0)
                    }
                }
            }
        }
    }
}

内部调整的算法如下:

$$ \begin{align} throughput &= 0 \quad if \quad delivered = 0 \quad else \\ throughput &= \frac{delivered} {elapsedSecond}*MI + (1-MI)*throughput \quad where \\ MI &= 0.1 \end{align} $$

Fetcher

前文讲到,fetcher 是基于新出区块的同步,而新出的区块是通过 pm.minedBroadcastLoop 这一线程来处理的,这一线程中会前后发送两批消息,分别是 NewBlockMsgNewBlockHashesMsg,对于接收到这两个消息的节点会使用 fetcher 模块来处理。同时,在接收到 BlockHeadersMsgBlockBodiesMsg 的时候,需要过滤一下,让 fetcher 优先处理他感兴趣的内容,接下来才轮到 downloader 处理。

总结一下,接收到的与 fetcher 模块有关的消息:

msgCode fetcher invoked Description
NewBlockMsg Enqueue 接收到新出区块的 Body
NewBlockHashesMsg Notify 接收到新出区块的 Hash
BlockHeadersMsg FilterHeaders 接收到区块的 Headers 列表
BlockBodiesMsg FilterBodies 接收到区块的 Bodies 列表

接下来从每个场景分别分析数据流程。

在介绍 fetcher 之前,先介绍下几个用到的基本数据结构:

//eth/fetcher/fetcher.go
type Fetcher struct {
    // Various event channels
    notify       chan *announce
    inject       chan *inject
    headerFilter chan chan *headerFilterTask
    bodyFilter   chan chan *bodyFilterTask

    // Announce states
    announces  map[string]int
    announced  map[common.Hash][]*announce
    fetching   map[common.Hash]*announce
    fetched    map[common.Hash][]*announce
    completing map[common.Hash]*announce

    // Block cache
    queue  *prque.Prque
    queues map[string]int
    queued map[common.Hash]*inject

    // Callbacks
    getBlock       blockRetrievalFn
    verifyHeader   headerVerifierFn
    broadcastBlock blockBroadcasterFn
    chainHeight    chainHeightFn
    insertChain    chainInsertFn
    dropPeer       peerDropFn

    done chan common.Hash
    quit chan struct{}
}

可以将数据结构分成几大类:

  1. 用于传递数据的 channel 结构,这几个结构与 downloader 中 DeliverXXX 的使用相似,通过往 channel 中发送消息,达到数据传递的效果,针对上面讲的 fetcher 的四个场景,分别有四个 channel 处理不同的消息:

    • notify 用于 fetcher.Notifyfetcher 里面传递数据;
    • inject 用于 fetcher.Enqueuefetcher 里面传递数据;
    • headerFilter 用于 fetcher.FilterHeadersfetcher 里面传递数据;
    • bodyFilter 用于 fetcher.FilterBodiesfetcher 里面传递数据;
  2. 内部状态存储的 map 结构,存储与 Peer, 任务相关的数据,也称为状态数据:

    • announces 保存每个 Peer 发起的 NewBlockHashesMsg 消息的次数,避免 Peer 做恶意攻击,每 5 钞内 256 次请求(但是 fetcher 还没有完成 fetching)会被认为是攻击行为,不予处理。这里主要是限制每个 Peer 对于 fetcher 的内存占用,避免被 OOM;
    • announced 保存当前收到的 NewBlockHashesMsg 的消息分布情况 ,以 Hash 为 key,消息体数组为 value,代表了同一个 hash 来自不同 Peer 的消息体集合;
    • fetching 保存当前正在 fetching header 的消息,也就是正在执行的任务,这里的任务是根据 announced 中筛选出来的,规则见下文分析;
    • fetched 当前已经 fetch 了 header, 等待去 fetch body 的任务,与 announced 一样,value 也是消息体数组;
    • completing 当前已经 fetch 了 header 和 body 的任务
  3. 缓存任务的 queue 及相关结构

    • queue 以 number 有序的优先级队列,与 downloader 不一样的是,这里直接使用 prque 模块,进入这里的数据是完整的 types.Block 数据,这里的数据直接用于最后的 insert chain;
    • queuesannounces 类似又不同,保存每个 Peer 发起的 NewBlockMsg 消息的次数,与 announces 不同的是,累积 64 次不同 Hash 请求会被认为是攻击行为(也就是如果发送的是相同的 Hash,请求会被合并,只会被处理一次),不予处理(与 announces 一样,出于内存的考量);
    • queued 以 Hash 为 key 的 map, 保存当前已经 Enqueue 的任务。

      有人可能好奇了,queue 既然是有序的,那必然是唯一的,为何多用一个 map 结构保存 queue 中的任务呢? > 原因也比较简单,queue 中优先级使用的是 number,而这里保存的是以 hash 维度的消息,是两种索引方式,因此这一结构并不可少,以避免消息被重复消费。

  4. 回调函数,也称为闭包,封装用于链上行为的操作,这样子也是减少模块之间耦合的一种编程方式,回调函数的内部已经注入了其它模块的实体,因此这儿就不用再依赖于具体的模块,以 chainHeight 成员为例说明如下:

    //eth/fetcher/fetcher.go
    type headerRequesterFn func(common.Hash) error
    
    //eth/handler.go
    heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() }
    manager.fetcher = fetcher.New(,,,heighter...)

    因此,在 fetcher 内部,不需要再调用 blockchain 接口。

  5. 用于线程间通信的 channel 结构,这也是 channel 非常常用的功能:

    • done 用于表明当前 Block 已经完全 insert 到链上了,这一任务完成了,通知 fetcher 做一些善后工作(清理缓存,调整内部状态);
    • quit 用于中止 fetcher 继续。

fetcher.loop

一个 fetcher 线程被启动之后,进入 fetcher.loop,通过 channel 和 timer 实现一个 FSM 状态机

//eth/fetcher/fetcher.go
func (f *Fetcher) loop() {
    fetchTimer := time.NewTimer(0)
    completeTimer := time.NewTimer(0)

    for {
        for hash, announce := range f.fetching {
            if time.Since(announce.time) > fetchTimeout {
                f.forgetHash(hash)
            }
        }
        height := f.chainHeight()
        for !f.queue.Empty() {
            op := f.queue.PopItem().(*inject)

            number := op.block.NumberU64()
            if number > height+1 {
                f.queue.Push(op, -float32(op.block.NumberU64()))
                break
            }

            hash := op.block.Hash()
            if number+maxUncleDist < height || f.getBlock(hash) != nil {
                f.forgetBlock(hash)
                continue
            }
            f.insert(op.origin, op.block)
        }

        select {
        case <-f.quit:
            return

        case notification := <-f.notify:
            ...

        case op := <-f.inject:
            f.enqueue(op.origin, op.block)

        case hash := <-f.done:
            f.forgetHash(hash)
            f.forgetBlock(hash)

        case <-fetchTimer.C:
            ...

        case <-completeTimer.C:
            ...

        case filter := <-f.headerFilter:
            ...

        case filter := <-f.bodyFilter:
            ...
        }
    }
}

不难看出,其流程如下:

  1. 清理淘汰当前正在 fetching header 的超时任务($>5s$),淘汰细则见下文;

  2. 根据 fetcher.chainHeight() 得到当前的区块高度 $CH$,然后遍历当前队列中的消息(由 Enqueue 加入进来的)$op$,将合适的区块 insert 到本地 blockchain 中:

    1. 如果这一消息比当前区块高出不少($> (CH + 1) $):将消息 push 回队列,说明本地区块太旧了,不能 insert;
    2. 如果这一消息比当前区块低出不少($<(CH - 7) $):忽略这一个消息,尝试下一个消息,这其中的 $7$ 被认为是 uncle 区块最大的距离;
    3. 如果本地 blockchain 中已经有这一区块了:忽略这一个消息,尝试下一个消息;
    4. 剩下的情况,本地 blockchain 上还不存在,并且 $ op \in [CH - 7, CH +1] $,可以直接 insert 到 blockchain 上,insert 的逻辑后面再讲;
  3. 根据不同的 channel 中接收到的消息,走相应的状态变化:

    上面讲了 4 个 channel 分别用于 4 种不同的消息处理,额外还有两个可变定时器,用于过渡状态变更,接下来分别看待这些状态的变化过程。

约定

为方便起见,下面约定如下:

  • $P$ 代表 Peer;
  • $H$ 代表本地 handleMsg 线程;
  • $F$ 代表 fetcher 线程;
  • $CH$ 代表本地当前区块高度;
  • $PH$ 消息中给定的区块高度,即 Peer 发布新区块的高度;
  • 引用的代码片段中若不标注来源,均来自于 /eth/fetcher/fetcher.go, 请知晓;

Enqueue new block from NewBlockMsg

dataflow:

  1. <- f.inject
  2. -> f.queue

loop 中对于 Enqueue 的消息处理非常简单,直接调用 fetcher.enqueue 来处理,后者的流程也比较简单:

TODO:补充流程图

  1. 判断这一 Peer 的请求次数(f.queues 中统计的值)是否超过了 blockLimit(常量 64);

  2. 判断这一消息中的 number 高度是否合适:

    1. 如果 $CH - PH &gt; 7$ 认为太远了,应当忽略掉;
    2. 如果 $PH -CH &gt; 32$ 认为离当头区块太高了,没有必要处理,同样忽略掉;
  3. 如果本地的 queue 中还没有出现过这一消息,那么 push 到 queue 中,同时更新这一 peer 的请求次数 $++$

注:除了 Enqueue 会将任务插入到 queue 之后,还有其它方式入队列,包括 FilterHeaders 和 FilterBodies 两个调用。

Notify block hash from NewBlockHashesMsg

dataflow:

  1. <- f.notify
  2. -> f.announced

Enqueue 中接收到的消息已经是完整的 types.Block 结构,但是 Notify 中接收的是只有 Hash 信息,需要通过 Hash 到 Peer 反查得到 Block 完整信息,对本地区块链才可以处理。相比于 Enqueue 的流程,Notify 比较复杂了,先来看下,这儿使用的数据结构。

announce

type announce struct {
    hash   common.Hash
    number uint64
    header *types.Header
    time   time.Time

    origin string

    fetchHeader headerRequesterFn
    fetchBodies bodyRequesterFn
}

其中:

  • origin 存储 Peer id,上文讲到统计每个 Peer 的请求次数,就是以这个值为索引查询的;
  • fetchHeaderfetchBodiesdownloader 中的相似,不过由于这里是在 pm.handleMsg 中直接调用的,使用的是 eth/peer.go 模块(downloader 中使用的是 eth/downloader/peer.go, 在 eth/peer.go 的基础上封装了流控等过程)。

在 loop 中的处理,主要是检查一下然后加入了 announced 这个 Map, 等待定时处理。

case notification := <-f.notify:
    count := f.announces[notification.origin] + 1
    if count > hashLimit { break }

    if notification.number > 0 {
        if dist := int64(notification.number) - int64(f.chainHeight());
            dist < -maxUncleDist || dist > maxQueueDist {
            break
        }
    }

    if _, ok := f.fetching[notification.hash]; ok { break }
    if _, ok := f.completing[notification.hash]; ok { break }

    f.announces[notification.origin] = count
    f.announced[notification.hash] = append(f.announced[notification.hash],
                                            notification)
    if len(f.announced) == 1 {
        f.rescheduleFetch(fetchTimer)
    }

代码量比起 Enqueue 来看多了不少,不过流程还是清晰的:

  1. 判断这一 Peer 的请求次数(f.announces 中统计的值)是否超过了 hashLimit(常量 256);

  2. Enqueue 一样,判断这一消息中的 number 高度是否合适:

    1. 如果 $CH - PH &gt; 7$ 认为太远了,应当忽略掉;
    2. 如果 $PH -CH &gt; 32$ 认为离当头区块太高了,没有必要处理,同样忽略掉;
  3. 如果这一消息是否正在 fetching 或者已经 completing 了,那么直接返回,不需要处理了;

  4. 将这一消息追加到 announced[hash] 数组中(前文说过,announced 是以 hash 为 key 的 map 结构, value 是数组结构);

  5. 如果当前 announced 中只有一个成员,那么重置 fetch 定时器,将定时器触发时间调整为 $500ms - Notify.Time$,也就是在 500ms 以内 $F$ 会通过 fetcher 定时器超时向 $P$ 获取更多消息细节,这一 500ms 和只有一个成员才重置的设计更多的是照顾到 $P$ 的性能考虑,给予 $P$ 一点回落时间,缓和一下。

fetchTimer 超时处理

dataflow:

  1. <- f.announced
  2. -> f.fetching

上面讲到,Notify 接收到 hash 之后,需要等待一点点时间,然后去 $P$ 获取关于这个 hash 更多的细节,直接看定时器超时处理代码:

case <-fetchTimer.C:
    request := make(map[string][]common.Hash)

    for hash, announces := range f.announced {
        if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
            announce := announces[rand.Intn(len(announces))]
            f.forgetHash(hash)

            if f.getBlock(hash) == nil {
                request[announce.origin] = append(request[announce.origin], hash)
                f.fetching[hash] = announce
            }
        }
    }

    for peer, hashes := range request {
        fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
        go func() {
            for _, hash := range hashes { fetchHeader(hash) }
        }()
    }

    f.rescheduleFetch(fetchTimer)

细节部分如下:

  1. 遍历所有待 fetching 的消息(f.announced):

    1. 声明一个以 Peer 为 key, 请求消息体数组(考虑到一个 Peer 可能有多个请求的情况)为 value 的 map 结构 request;
    2. 如果某一个 hash 的存在时间已经超过了 400ms 以上(确保 $P$ 的缓和时间)并且本地 blockchain 中还没有,并做下面几件事:
      • f.announced[hash] 数组中随机选择一个消息追加到 request[hash] 中;
      • 同时对这一个 hash 执行 f.forgetHash
      • f.fetching 中记录这一 hash 及其任务;
  2. 对于 request 中的所有消息,以 Peer 为单位并发执行 fetchHeader 操作,一个 Peer 的不同 Hash 之间是同步操作;

    注意

    for peer, hashes := range request {
        fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
        go func() {
            for _, hash := range hashes { fetchHeader(hash) }
        }()
    }

    上面的 hashes := hashes 是线程安全的一种写法,在 for 循环中先捕获到 hashes 的值,Goroutine 可以直接使用,详见 Iteration Variables and Closures in "for" Statements · 50 Shades of Go: Traps, Gotchas, and Common Mistakes for New Golang Devs 的解释

  3. 如果 f.announced 不为空,重置 fetch 定时器,规则和前面的一样。

FilterHeaders to get my headers in BlockHeadersMsg

dataflow:

  1. <- headerTaskFilter
  2. -> f.fetched or -> f.queue
  3. -> remain back to caller

上一步的 fetchHeader 会通过 p2p 向 Peer 发送 GetBlockHeadersMsg 消息(包含一个 hash 值),故收到的也是一个 Header,而这一消息也有可能是通过 downloader 发送的,因此需要从接收的回应中过滤出由 fetcher 发出的消息,因此 fetcher 中取名 FilterHeaders 函数的原因,回顾过来看 handleMsg 中的处理:

//eth/handler.go
case msg.Code == BlockHeadersMsg:
    filter := len(headers) == 1
    if filter {
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
            ...
            if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
                return err
            }
            return nil
        }
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
    }
    if len(headers) > 0 || !filter {
        err := pm.downloader.DeliverHeaders(p.id, headers)
    }

知道这个原因之后,我们就可以知道为何上面只在 len(headers) == 1 的时候执行过滤操作,并且验证 DAO 分叉的情况。

下面重点关注 FilterHeaders 内部的处理,与上面几个方法不同的,FilterHeaders 是一个同步调用的过程:将收到的一个 Header 扔给 fetcher, 后者检查是否是由自身发起的,将非自身发起的部分退回给 $H$,$H$ 上报消息给 downloader

传递任务

FilterHeaders 其内部实现流程如下,先来看 $H$ 如何将消息上报给 $F$ 的过程:

func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
    filter := make(chan *headerFilterTask)

    select {
    case f.headerFilter <- filter:
    case <-f.quit:
        return nil
    }

    select {
    case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
    case <-f.quit:
        return nil
    }

    select {
    case task := <-filter:
        return task.headers
    case <-f.quit:
        return nil
    }
}

其处理流程由三个 select 组成:

  1. f.headerFilter 中发送一个 filter, 注意 filter 的类型也是 channel;
  2. filter 中发送真实的任务(peer, headers, time);
  3. 等待内层 channel filter 返回数据。

这种 channel 内嵌套 channel 的使用我们还是第一次看到,channel 的经典用法是,一方发送,另一方接收,另一方接收之后处理消息,然后就没有然后了,并不会出现另一方接收了消息之后再往同一个 channel 中将结果发送回去。从工程学上考虑,往 channel 中再回传消息这种方式也是不合适的,往 channel 中发送的是任务,接收者处理之后是得到的是结果,这是两个类型,用一个 channel 是不合适的。因此这种 channel over channel 设计的好处就非常明显了:外部的 channel 用于发送任务,内部的 channel 用于回传结果,其一般的使用场景是:

  1. 发送者和接收者通过 channelA 建立一条消息处理通道;
  2. 发送者往 channelA 中发给接收者一个 channelB;
  3. 接收者根据监听的 channelA 上接收到的 channelB 中的内容,执行计算;
  4. 接收者完成计算之后,将结果反馈到 channelB 上,而发送者通过 channelB 刚好可以取到结果,从而形成了一个消息反馈机制。

处理任务

接下来看 fetcher 内部处理的过程:

case filter := <-f.headerFilter:
    var task *headerFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }

    unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
    for _, header := range task.headers {
        hash := header.Hash()
        if announce := f.fetching[hash]; announce != nil &&
            announce.origin == task.peer && f.fetched[hash] == nil &&
            f.completing[hash] == nil && f.queued[hash] == nil {

            if header.Number.Uint64() != announce.number {
                f.dropPeer(announce.origin)
                f.forgetHash(hash)
                continue
            }

            if f.getBlock(hash) == nil {
                announce.header, announce.time = header, task.time

                if header.TxHash == types.DeriveSha(types.Transactions{}) &&
                    header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
                    block := types.NewBlockWithHeader(header)
                    block.ReceivedAt = task.time

                    complete = append(complete, block)
                    f.completing[hash] = announce
                    continue
                }

                incomplete = append(incomplete, announce)
            } else {
                f.forgetHash(hash)
            }
        } else {
            unknown = append(unknown, header)
        }
    }

    select {
    case filter <- &headerFilterTask{headers: unknown, time: task.time}:
    case <-f.quit:
        return
    }

    for _, announce := range incomplete {
        hash := announce.header.Hash()
        if _, ok := f.completing[hash]; ok { continue }

        f.fetched[hash] = append(f.fetched[hash], announce)
        if len(f.fetched) == 1 { f.rescheduleComplete(completeTimer) }
    }

    for _, block := range complete {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }

接收到的 Hash 对于 fetcher 来说可能有三种状态:

  1. 不属于 fetcher 发送的,直接返回给调用者;

  2. fetcher 在 fetchTimer 定时器回调中发起的:

    1. 根据区块头查看,如果这个区块不包含任何交易或者是 Uncle 区块,那么不需要再下载 Body 了,直接放到 complete 中; 对于这些 complete 状态的任务,接下来的任务就是 insert chain 操作,fetcher 在这里复用了 f.queue 队列,因此直接将这些状态的任务 enqueue 操作,由其它过程调度 insert 即可
    2. 其它情况,放到 incomplete 中,等待下次调度执行; 将这些状态的任务放入到 f.fetched 中,同时与在 Notify 中的处理类似,如果 len(f.fetched) == 1 那么重置 complete 定时器在 100ms 以内。

completeTimer 超时处理

dataflow:

  1. <- f.fetched
  2. -> f.completing

与 fetcherTimer 类似,不过用于获取区块 Body,**注意,前者使用的是 fetchOneHeader, 而这里是通过 fetchBodies 一次将一个 Bode 全部取回来:

case <-completeTimer.C:
    request := make(map[string][]common.Hash)

    for hash, announces := range f.fetched {
        announce := announces[rand.Intn(len(announces))]
        f.forgetHash(hash)

        if f.getBlock(hash) == nil {
            request[announce.origin] = append(request[announce.origin], hash)
            f.completing[hash] = announce
        }
    }

    for peer, hashes := range request {
        go f.completing[hashes[0]].fetchBodies(hashes)
    }

    f.rescheduleComplete(completeTimer)

FilterBodies to get my bodies from BlockBodiesMsg

dataflow:

  1. <- bodyFilterTask
  2. -> f.queue
  3. -> remain back to caller

FilterHeaders 的处理类似,但是过滤的时候稍微复杂一点:

case filter := <-f.bodyFilter:
    var task *bodyFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }

    blocks := []*types.Block{}
    for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
        matched := false

        for hash, announce := range f.completing {
            if f.queued[hash] == nil {
                txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
                uncleHash := types.CalcUncleHash(task.uncles[i])

                if txnHash == announce.header.TxHash &&
                    uncleHash == announce.header.UncleHash &&
                    announce.origin == task.peer {
                    matched = true

                    if f.getBlock(hash) == nil {
                        block := types.NewBlockWithHeader(announce.header)
                            .WithBody(task.transactions[i], task.uncles[i])
                        block.ReceivedAt = task.time

                        blocks = append(blocks, block)
                    } else {
                        f.forgetHash(hash)
                    }
                }
            }
        }
        if matched {
            task.transactions = append(task.transactions[:i],
                                       task.transactions[i+1:]...)
            task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
            i--
            continue
        }
    }

    select {
    case filter <- task:
    case <-f.quit:
        return
    }

    for _, block := range blocks {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }

fetcher.insert 把给定的区块插入本地的区块链。

insert 的逻辑和 downloader 中的相似,将 Block 写入到本地 blockchain 之中;但也有不同,不同点在于 insert 完成之后,$F$ 与 miner 的行为一致,将这一 block/hash 转发给自己的 Peers:

func (f *Fetcher) insert(peer string, block *types.Block) {
    hash := block.Hash()

    go func() {
        defer func() { f.done <- hash }()

        parent := f.getBlock(block.ParentHash())
        if parent == nil { return }

        switch err := f.verifyHeader(block.Header()); err {
        case nil:
            go f.broadcastBlock(block, true)

        case consensus.ErrFutureBlock:
            // Weird future block, don't fail, but neither propagate

        default:
            f.dropPeer(peer)
            return
        }

        if _, err := f.insertChain(types.Blocks{block}); err != nil { return }

        go f.broadcastBlock(block, false)
    }()
}

insert 内部开启一个 goroutine 做下面的流程,如下:

  1. 先验证 Header, 如果验证通过,马上对区块 Body 进行广播;
  2. 将验证通过的 Block insert 到本地 blockchain 之中;
  3. 再广播一次这一区块的 Block Hash;
  4. 在操作完成之后,执行 insert 的 goroutine 发送本次执行的 hash 到 $F$ 线程 done channel 中,$F$ 在之后的调度中会去执行收尾工作。

收尾工作

收尾包括两部分内容:forgetHash 与 forgetBlock,这两者中会将前面提到过的中间状态还原,包括:

  • f.announced
  • f.fetching
  • f.fetched
  • f.completing
  • f.queues
  • f.queued

fetcher 模块总结

上述这些 fetcher 的中间过程,虽然离散错乱,但还是可以整合成一条完整的数据流:

image-20181025183722312

References