-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
329 lines (302 loc) · 10.5 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package trie
import (
"errors"
"fmt"
ogTypes "github.com/annchain/OG/og_interface"
"github.com/annchain/OG/ogdb"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
// ErrNotRequested is returned by the trie sync when it's requested to process a
// node it did not request.
var ErrNotRequested = errors.New("not requested")
// ErrAlreadyProcessed is returned by the trie sync when it's requested to process a
// node it already processed previously.
var ErrAlreadyProcessed = errors.New("already processed")
// request represents a scheduled or already in-flight state retrieval request.
type request struct {
hash ogTypes.Hash // Hash of the node data content to retrieve
data []byte // Data content of the node, cached until all subtrees complete
raw bool // Whether this is a raw entry (code) or a trie node
parents []*request // Parent state nodes referencing this entry (notify all upon completion)
depth int // Depth level within the trie the node is located to prioritise DFS
deps int // Number of dependencies before allowed to commit this node
callback LeafCallback // Callback to invoke if a leaf node it reached on this branch
}
// SyncResult is a simple list to return missing nodes along with their request
// hashes.
type SyncResult struct {
Hash ogTypes.Hash // Hash of the originally unknown trie node
Data []byte // Data content of the retrieved node
}
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
batch map[ogTypes.HashKey][]byte // In-memory membatch of recently completed items
order []ogTypes.Hash // Order of completion to prevent out-of-order data loss
}
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
return &syncMemBatch{
batch: make(map[ogTypes.HashKey][]byte),
order: make([]ogTypes.Hash, 0, 256),
}
}
// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
type Sync struct {
database DatabaseReader // Persistent database to check for existing entries
membatch *syncMemBatch // Memory buffer to avoid frequest database writes
requests map[ogTypes.HashKey]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
}
// NewSync creates a new trie data download scheduler.
func NewSync(root ogTypes.Hash, database DatabaseReader, callback LeafCallback) *Sync {
ts := &Sync{
database: database,
membatch: newSyncMemBatch(),
requests: make(map[ogTypes.HashKey]*request),
queue: prque.New(),
}
ts.AddSubTrie(root, 0, &ogTypes.Hash32{}, callback)
return ts
}
// AddSubTrie registers a new trie to the sync code, rooted at the designated parent.
func (s *Sync) AddSubTrie(root ogTypes.Hash, depth int, parent ogTypes.Hash, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root.HashKey() == emptyRoot.HashKey() {
return
}
if _, ok := s.membatch.batch[root.HashKey()]; ok {
return
}
key := root.Bytes()
blob, _ := s.database.Get(key)
if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
return
}
// Assemble the new sub-trie sync request
req := &request{
hash: root,
depth: depth,
callback: callback,
}
// If this sub-trie has a designated parent, link them together
if parent.HashKey() != (ogTypes.Hash32{}.HashKey()) {
ancestor := s.requests[parent.HashKey()]
if ancestor == nil {
panic(fmt.Sprintf("sub-trie ancestor not found: %s", parent.HashKey()))
}
ancestor.deps++
req.parents = append(req.parents, ancestor)
}
s.schedule(req)
}
// AddRawEntry schedules the direct retrieval of a state entry that should not be
// interpreted as a trie node, but rather accepted and stored into the database
// as is. This method's goal is to support misc state metadata retrievals (e.g.
// contract code).
func (s *Sync) AddRawEntry(hash ogTypes.Hash, depth int, parent ogTypes.Hash) {
// Short circuit if the entry is empty or already known
if hash == emptyState {
return
}
if _, ok := s.membatch.batch[hash.HashKey()]; ok {
return
}
if ok, _ := s.database.Has(hash.Bytes()); ok {
return
}
// Assemble the new sub-trie sync request
req := &request{
hash: hash,
raw: true,
depth: depth,
}
// If this sub-trie has a designated parent, link them together
if parent.HashKey() != (ogTypes.Hash32{}.HashKey()) {
ancestor := s.requests[parent.HashKey()]
if ancestor == nil {
panic(fmt.Sprintf("raw-entry ancestor not found: %s", parent.HashKey()))
}
ancestor.deps++
req.parents = append(req.parents, ancestor)
}
s.schedule(req)
}
// Missing retrieves the known missing nodes from the trie for retrieval.
func (s *Sync) Missing(max int) []ogTypes.Hash32 {
var requests []ogTypes.Hash32
for !s.queue.Empty() && (max == 0 || len(requests) < max) {
requests = append(requests, s.queue.PopItem().(ogTypes.Hash32))
}
return requests
}
// Process injects a batch of retrieved trie nodes data, returning if something
// was committed to the database and also the index of an entry if processing of
// it failed.
func (s *Sync) Process(results []SyncResult) (bool, int, error) {
committed := false
for i, item := range results {
// If the item was not requested, bail out
request := s.requests[item.Hash.HashKey()]
if request == nil {
return committed, i, ErrNotRequested
}
if request.data != nil {
return committed, i, ErrAlreadyProcessed
}
// If the item is a raw entry request, commit directly
if request.raw {
request.data = item.Data
s.commit(request)
committed = true
continue
}
// Decode the node data content and update the request
node, err := decodeNode(item.Hash.Bytes(), item.Data, 0)
if err != nil {
return committed, i, err
}
request.data = item.Data
// Create and schedule a request for all the children nodes
requests, err := s.children(request, node)
if err != nil {
return committed, i, err
}
if len(requests) == 0 && request.deps == 0 {
s.commit(request)
committed = true
continue
}
request.deps += len(requests)
for _, child := range requests {
s.schedule(child)
}
}
return committed, 0, nil
}
// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning the number of items written and any occurred error.
func (s *Sync) Commit(dbw ogdb.Putter) (int, error) {
// Dump the membatch into a database dbw
for i, key := range s.membatch.order {
if err := dbw.Put(key.Bytes(), s.membatch.batch[key.HashKey()]); err != nil {
return i, err
}
}
written := len(s.membatch.order)
// Drop the membatch data and return
s.membatch = newSyncMemBatch()
return written, nil
}
// Pending returns the number of state entries currently pending for download.
func (s *Sync) Pending() int {
return len(s.requests)
}
// schedule inserts a new state retrieval request into the fetch queue. If there
// is already a pending request for this node, the new request will be discarded
// and only a parent reference added to the old one.
func (s *Sync) schedule(req *request) {
// If we're already requesting this node, add a new reference and stop
if old, ok := s.requests[req.hash.HashKey()]; ok {
old.parents = append(old.parents, req.parents...)
return
}
// Schedule the request for future retrieval
s.queue.Push(req.hash, float32(req.depth))
s.requests[req.hash.HashKey()] = req
}
// children retrieves all the missing children of a state trie entry for future
// retrieval scheduling.
func (s *Sync) children(req *request, object Node) ([]*request, error) {
// Gather all the children of the node, irrelevant whether known or not
type child struct {
node Node
depth int
}
children := []child{}
switch node := (object).(type) {
case *ShortNode:
children = []child{{
node: node.Val,
depth: req.depth + len(node.Key),
}}
case *FullNode:
for i := 0; i < 17; i++ {
if node.Children[i] != nil {
children = append(children, child{
node: node.Children[i],
depth: req.depth + 1,
})
}
}
default:
panic(fmt.Sprintf("unknown node: %+v", node))
}
// Iterate over the children, and request all unknown ones
requests := make([]*request, 0, len(children))
for _, child := range children {
// Notify any external watcher of a new key/value node
if req.callback != nil {
if node, ok := (child.node).(ValueNode); ok {
if err := req.callback(node, req.hash); err != nil {
return nil, err
}
}
}
// If the child references another node, resolve or schedule
if node, ok := (child.node).(HashNode); ok {
// Try to resolve the node from the local database
hash := ogTypes.BytesToHash(node).(*ogTypes.Hash32)
if _, ok := s.membatch.batch[hash.HashKey()]; ok {
continue
}
if ok, _ := s.database.Has(node); ok {
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &request{
hash: hash,
parents: []*request{req},
depth: child.depth,
callback: req.callback,
})
}
}
return requests, nil
}
// commit finalizes a retrieval request and stores it into the membatch. If any
// of the referencing parent requests complete due to this commit, they are also
// committed themselves.
func (s *Sync) commit(req *request) (err error) {
// Write the node content to the membatch
s.membatch.batch[req.hash.HashKey()] = req.data
s.membatch.order = append(s.membatch.order, req.hash)
delete(s.requests, req.hash.HashKey())
// Check all parents for completion
for _, parent := range req.parents {
parent.deps--
if parent.deps == 0 {
if err := s.commit(parent); err != nil {
return err
}
}
}
return nil
}