-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathindex.js
182 lines (151 loc) · 5.52 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
const path = require("path")
const grpc = require("grpc")
const protoLoader = require("@grpc/proto-loader")
const ProtoBuf = require("protobufjs")
const { createDfuseClient } = require("@dfuse/client")
// Global required by dfuse client
global.fetch = require("node-fetch")
global.WebSocket = require("ws")
const bstreamProto = loadProto("dfuse/bstream/v1/bstream.proto")
const eosioProto = loadProto("dfuse/eosio/codec/v1/codec.proto")
const bstreamService = loadGrpcPackageDefinition("dfuse/bstream/v1/bstream.proto").dfuse.bstream.v1
const blockMsg = bstreamProto.root.lookupType("dfuse.bstream.v1.Block")
const eosioBlockMsg = eosioProto.root.lookupType("dfuse.eosio.codec.v1.Block")
const forkStepEnum = bstreamProto.root.lookupEnum("dfuse.bstream.v1.ForkStep")
const forkStepNew = forkStepEnum.values["STEP_NEW"]
const forkStepUndo = forkStepEnum.values["STEP_UNDO"]
const forkStepIrreversible = forkStepEnum.values["STEP_IRREVERSIBLE"]
async function main() {
if (process.argv.length <= 2) {
console.error("Error: Wrong number of arguments")
console.error()
console.error("usage: node index.js <apiKey> [--full]")
process.exit(1)
}
const dfuse = createDfuseClient({
apiKey: process.argv[2],
network: "eos.firehose.eosnation.io",
})
const client = new bstreamService.BlockStreamV2(
"eos.firehose.eosnation.io:9000",
grpc.credentials.createSsl(), {
"grpc.max_receive_message_length": 1024 * 1024 * 100,
"grpc.max_send_message_length": 1024 * 1024 * 100
}
)
const showFull = process.argv.length > 3 && process.argv[3] == "--full"
try {
await new Promise(async (resolve, reject) => {
let stream
try {
const metadata = new grpc.Metadata()
metadata.set("authorization", (await dfuse.getTokenInfo()).token)
stream = client.Blocks(
{
start_block_num: 150000000,
stop_block_num: 150000005,
include_filter_expr: 'action == "onblock"',
// By default, step events received are `new`, `undo` and `irreversible`, for irreversible only, uncommented the following
// fork_steps: [forkStepIrreversible],
},
metadata
)
stream.on("data", (data) => {
const { block: rawBlock } = data
if (rawBlock.type_url !== "type.googleapis.com/dfuse.eosio.codec.v1.Block") {
rejectStream(stream, reject, invalidTypeError(rawBlock.type_url))
return
}
switch (data.step) {
case "STEP_NEW":
// Block is the new head block of the chain
break
case "STEP_UNDO":
// Block has been forked out, should undo everything
break
case "STEP_IRREVERSIBLE":
// Block is now irreversible, it's number will be ~360 blocks in the past
break
}
const step = data.step.toLowerCase().replace(/step_/, "")
const block = eosioBlockMsg.decode(rawBlock.value)
// Use the "filtered" versions since all blocks returned by Firehose is always filtered
const transactionCount = block.filteredTransactionTraces.length
let actionCount = 0
let systemActionCount = 0
block.filteredTransactionTraces.forEach((trace) => {
trace.actionTraces.forEach((actionTrace) => {
actionCount += 1
if (actionTrace.receiver === "eosio") {
systemActionCount += 1
}
})
})
console.log(
`Block #${block.number} (${block.id}, ${step}) - ${transactionCount} Transactions, ${actionCount} Actions (${systemActionCount} System Actions)`
)
if (showFull) {
console.log(JSON.stringify(block, null, " "))
}
})
stream.on("error", (error) => {
rejectStream(stream, reject, error)
})
stream.on("status", (status) => {
if (status.code === 0) {
resolveStream(stream, resolve)
return
}
// On error, I've seen the "error" callback receiving it, so not sure in which case we would do something else here
})
} catch (error) {
if (stream) {
rejectStream(stream, reject, error)
} else {
reject(error)
}
}
})
} finally {
// Clean up resources, should be performed only if the gRPC client (`client` here) and/or the dfuse client
// (`dfuse` here) are not needed anymore. If you have pending stream, you should **not** close those since
// they are required to make the stream works correctly.
client.close()
dfuse.release()
}
}
function loadGrpcPackageDefinition(package) {
const protoPath = path.resolve(__dirname, "proto", package)
const proto = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
})
return grpc.loadPackageDefinition(proto)
}
function loadProto(package) {
const protoPath = path.resolve(__dirname, "proto", package)
return ProtoBuf.loadSync(protoPath)
}
function resolveStream(stream, resolver) {
stream.cancel()
resolver()
}
function rejectStream(stream, rejection, error) {
stream.cancel()
rejection(error)
}
function invalidTypeError(type) {
return new Error(
`invalid message type '${type}' received, are you connecting to the right endpoint?`
)
}
main()
.then(() => {
console.log("Completed")
})
.catch((error) => {
console.error("An error occurred", error)
})