Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quint): refactor the blocksync spec. #545

Merged
merged 40 commits into from
Nov 15, 2024
Merged
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e20871f
Start Blocksync
josef-widder Oct 8, 2024
6f3c593
new Height enabled. needs to be tested
josef-widder Oct 8, 2024
2dcaae4
blocksync fuctions done
josef-widder Oct 9, 2024
0f0e8f7
it is moving
josef-widder Oct 10, 2024
9b934ab
Daniel told me to push
josef-widder Oct 10, 2024
5f62609
syncing on the lucky path
josef-widder Oct 10, 2024
0118eb3
syncing
josef-widder Oct 10, 2024
ca13638
moved definitions a bit around
josef-widder Oct 30, 2024
c575052
code cleanup
josef-widder Oct 31, 2024
75b2c18
more cleanup
josef-widder Oct 31, 2024
b889cd0
polishing
josef-widder Oct 31, 2024
c6258da
Merge branch 'main' into josef/blocksync-retreat
cason Nov 5, 2024
3f49d59
fix(quint): fix test units with new driver's methods and types (#524)
cason Nov 8, 2024
06a0af2
Merge branch 'main' into josef/blocksync-retreat
cason Nov 8, 2024
19d10d9
Apply suggestions from code review
cason Nov 11, 2024
7713010
Merge branch 'main' into josef/blocksync-retreat
cason Nov 11, 2024
1882dbb
spec/quint: create a blocksync directory
cason Nov 8, 2024
9a3e76e
spec/quint: refactored spec into 4 files
cason Nov 11, 2024
30d4187
spec/quint: spacing changes
cason Nov 11, 2024
606b99d
spec/quint: spacing changes again
cason Nov 11, 2024
56a65a9
spec/quint: TODOs to render Server self-contained
cason Nov 11, 2024
d03d5fd
feat(spec): Blocksync (#462)
josef-widder Nov 11, 2024
39f3cf9
Merge branch 'main' into cason/462-blocksync
cason Nov 11, 2024
e617e81
spec/quint: spacing changes again
cason Nov 11, 2024
a96f7cc
specs/quint: removed original sync.qnt
cason Nov 11, 2024
bb9c380
spec/quint: fixing blocksync/sync.qnt module
cason Nov 11, 2024
390dbf1
spec/quint: sync Server has an id : Address
cason Nov 11, 2024
eaa90e7
spec/quint: move BlockStoreEntry to types.qnt
cason Nov 11, 2024
bee0d60
specs/quint: blocksync Server keeps a chain copy
cason Nov 11, 2024
96cdf25
specs/quint: pleasing the linter
cason Nov 11, 2024
bc24ff1
spec/quint: updateServer is now an action
cason Nov 14, 2024
de2da35
Merge branch 'main' into cason/462-blocksync
cason Nov 14, 2024
98fd7a4
Merge branch 'main' into cason/462-blocksync
cason Nov 15, 2024
23c4c54
spec/quint: move client actions to client.qnt
cason Nov 15, 2024
b967216
spec/quint: rename sync client to bsync client
cason Nov 15, 2024
b7cbbca
spec/quint: server to bsync_server, with its actions
cason Nov 15, 2024
0eebb58
spec/quint: moved buffers to blocksync.qnt
cason Nov 15, 2024
8e5757e
quint/spec: refactoring of client and server methods
cason Nov 15, 2024
540b873
Merge branch 'main' into cason/462-blocksync
cason Nov 15, 2024
1e564b0
spec/quint: pleasing the linter
cason Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions specs/quint/specs/blocksync/blocksync.qnt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// -*- mode: Bluespec; -*-

module blocksync {

import types.* from "../types"
export types.*

type Option[a] =
| Some(a)
| None

type StatusMsg = {
peer: Address,
base: Height,
top: Height
}

type BlockRange = {
base: Height,
top: Height
}

type ReqType =
| SyncCertificate
| SyncBlock

type RequestMsg = {
client: Address,
server: Address,
rtype: ReqType,
height: Height
}

pure def emptyReqMsg = {
client: "",
server: "",
rtype: SyncCertificate,
height: -1
}

type Response =
| RespBlock(Proposal)
| RespCertificate(Set[Vote])

type ResponseMsg = {
client: Address,
server: Address,
height: Height,
response: Response,
}

pure def emptyResponseMsg = {
client: "",
server: "",
height: -1,
response: RespBlock(emptyProposal),
}

// *************
// State machine
// *************

// Messages exchanged by nodes (clients and servers)
var statusBuffer : Address -> Set[StatusMsg]
var requestsBuffer : Address -> Set[RequestMsg]
var responsesBuffer : Address -> Set[ResponseMsg]

// Auxiliary functions for sending messages

pure def broadcastStatusMsg(buffer: Address -> Set[StatusMsg], sm: StatusMsg): Address -> Set[StatusMsg] =
buffer.keys().mapBy(x => { ...buffer.get(x).union(Set(sm)) })

// put the response message in the buffer of the client
pure def sendResponse(buffer: Address -> Set[ResponseMsg], m: ResponseMsg): Address -> Set[ResponseMsg] =
buffer.put(m.client, { ...buffer.get(m.client).union(Set(m)) })

action initBsync(nodes) = all {
statusBuffer' = nodes.mapBy(v => Set()),
requestsBuffer' = nodes.mapBy(v => Set()),
responsesBuffer' = nodes.mapBy (v => Set()),
}

action unchangedBsync = all {
statusBuffer' = statusBuffer,
requestsBuffer' = requestsBuffer,
responsesBuffer' = responsesBuffer,
}
}
175 changes: 175 additions & 0 deletions specs/quint/specs/blocksync/client.qnt
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// -*- mode: Bluespec; -*-
//
// Blocksync protocol: client side.

module bsync_client {
import blocksync.* from "./blocksync"

/// The state of the synchronizer
type BsyncClient = {
id: Address,

peerStatus: Address -> BlockRange,
openRequests: Set[RequestMsg],

height: Height,
lastSyncedHeight: Height, // "done" if greater than or equal to height
// TODO: we could add buffers for certificates and values
// inbuffers
statusMsgs: Set[StatusMsg],
responseMsgs: Set[ResponseMsg],
}

type BsyncClientOutput =
| SOCertificate(Set[Vote])
| SOBlock(Proposal)
| SONoOutput

//
// BsyncClient functions
//

/// Initialize the synchronizer
pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient =
{
id: id,
peerStatus: peers.mapBy(x => {base:-1, top:-1}),
openRequests: Set(),
height: -1,
lastSyncedHeight: -1,
statusMsgs: Set(),
responseMsgs: Set(),
}

/// Auxiliary function to iterate over the received status messages
pure def updatePeerStatus (ps: Address -> BlockRange, msgs: Set[StatusMsg]) : Address -> BlockRange =
msgs.fold(ps, (newStatus , msg) =>
if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base?
newStatus.put(msg.peer, {base: msg.base, top: msg.top})
else
newStatus
)

/// inform the synchronizer that consensus has entered height h
pure def syncNewHeight (s: BsyncClient, h: Height) : BsyncClient =
if (h <= s.height)
s
else
s.with("height", h)

/// returned by the synchronizer: sync is the new state, so is the output towards
/// the consensus driver, req are messages sent towards peers/servers
type ClientResult = {
sync: BsyncClient,
so: BsyncClientOutput,
req: Option[RequestMsg]
}

/// We have received a certificate. now we need to issue the
/// corresponding block request and generate a certificate output.
pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult =
val blockReq = { client: s.id,
server: peer,
rtype: SyncBlock,
height: s.height}
{ sync: {...s, openRequests: Set(blockReq)}, // If we have parallelization we need to be more precise here
so: SOCertificate(cert),
req: Some(blockReq)}

/// we have received a block. now we need to generate a block output
pure def syncHandleBlock (s: BsyncClient, p: Proposal) : ClientResult =
{ sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element,
lastSyncedHeight: s.height }, // blockheight,
so: SOBlock(p),
req: None}

/// step of a client:
/// 1. update peer statuses, 2. if there is no open request, request something
/// 3. otherwise check whether we have a response and act accordingly
pure def bsyncClient (s: BsyncClient) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)
val newS = { ...s, peerStatus: newPeerStates}
if (s.lastSyncedHeight >= s.height)
// nothing to do
{ sync: newS,
so: SONoOutput,
req: None}
else
val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height
and s.height <= newPeerStates.get(p).top )
if (goodPeers.size() > 0)
if (s.openRequests.size() == 0)
// we start the sync "round" by asking for a certificate
val req = { client: s.id,
server: goodPeers.fold("", (acc, i) => i), //chooseSome(),
rtype: SyncCertificate,
height: s.height}
{ sync: {... newS, openRequests: s.openRequests.union(Set(req))},
so: SONoOutput,
req: Some(req)
}
else
// we issued a request before, let's see whether there is a response
if (s.responseMsgs.size()> 0)
val resp = s.responseMsgs.fold(emptyResponseMsg, (acc, i) => i) //chooseSome() // in the future there might be parallelization
match resp.response {
| RespBlock(prop) => syncHandleBlock(newS, prop)
| RespCertificate(cert) => syncHandleCertificate(newS, cert, goodPeers.fold("", (s,x) => x))
}
else
// I don't have response
// this might be timeout logic
{ sync: newS,
so: SONoOutput,
req: None}
else
// no peers
{ sync: newS,
so: SONoOutput,
req: None}

// State machine

var bsyncClients: Address -> BsyncClient

action initClient(nodes) = all {
bsyncClients' = nodes.mapBy(v => initBsyncClient(v, nodes.exclude(Set(v)))),
}

action unchangedClient = all {
bsyncClients' = bsyncClients,
}

action newHeightClient(node, h) = all {
bsyncClients' = bsyncClients.put(node, syncNewHeight(bsyncClients.get(node), h)),
}

// deliver a status message, from the statusBuffer, to node
action deliverStatus(node) = all {
statusBuffer.get(node).size() > 0,
val client = bsyncClients.get(node)
nondet msg = statusBuffer.get(node).oneOf()
all {
bsyncClients' = bsyncClients.put(node, {... client,
statusMsgs: client.statusMsgs.union(Set(msg))}),
statusBuffer' = statusBuffer.put(node, statusBuffer.get(node).exclude(Set(msg))),
requestsBuffer' = requestsBuffer,
responsesBuffer' = responsesBuffer,
}
}

// deliver a response message, from responsesBuffer, to node
action deliverResponse(node) = all {
responsesBuffer.get(node).size() > 0,
val client = bsyncClients.get(node)
nondet msg = responsesBuffer.get(node).oneOf()
all {
bsyncClients' = bsyncClients.put(node, {... client,
responseMsgs: client.responseMsgs.union(Set(msg))}),
requestsBuffer' = requestsBuffer,
responsesBuffer' = responsesBuffer.put(node, responsesBuffer.get(node).exclude(Set(msg))),
statusBuffer' = statusBuffer,
}
}

}
125 changes: 125 additions & 0 deletions specs/quint/specs/blocksync/server.qnt
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// -*- mode: Bluespec; -*-
//
// Blocksync protocol: server side.

module bsync_server {
import blocksync.* from "./blocksync"

type Server = {
id: Address,

chain: List[BlockStoreEntry],

// Incoming requests
requestMsgs: Set[RequestMsg]
}

pure def newServer(addr: Address) : Server = {
id: addr,
chain: List(),
requestMsgs: Set()
}

// generate a status message
pure def syncStatus (server: Server) : StatusMsg =
// TODO: perhaps we should add to height to the chain entries to capture non-zero bases
{ peer: server.id , base: 0, top: server.chain.length() - 1 }

/// new server state and response messages to be sent
type ServerOutput = {
sync: Server,
msg: Option[ResponseMsg],
}

// main method: respond to incoming request, if any
pure def syncServer (s: Server) : ServerOutput =
if (s.requestMsgs.size() > 0)
val m = s.requestMsgs.fold(emptyReqMsg, (acc, i) => i) // chooseSome() // TODO: fix
val result =
if (m.height < s.chain.length())
match m.rtype {
| SyncCertificate =>
val cm = { client: m.client,
server: s.id,
height: m.height,
response: RespCertificate(s.chain[m.height].commit)}
Some(cm)
| SyncBlock =>
val bl = { client: m.client,
server: s.id,
height: m.height,
response: RespBlock(s.chain[m.height].decision)}
Some(bl)
}
else None
{ sync: { ...s, requestMsgs: s.requestMsgs.exclude(Set(m))},
msg: result}
else {
sync: s,
msg: None}

// State machine

var bsyncServers: Address -> Server

action initServer(nodes) = all {
bsyncServers' = nodes.mapBy(v => newServer(v)),
}

action unchangedServer = all {
bsyncServers' = bsyncServers,
}

// Deliver a request message, from requestsBuffer, to node
action deliverRequest(node) = all {
requestsBuffer.get(node).size() > 0,
val server = bsyncServers.get(node)
nondet msg = requestsBuffer.get(node).oneOf()
all {
bsyncServers' = bsyncServers.put(node, {... server,
requestMsgs: server.requestMsgs.union(Set(msg))}),
statusBuffer' = statusBuffer,
requestsBuffer' = requestsBuffer.put(node, requestsBuffer.get(node).exclude(Set(msg))),
responsesBuffer' = responsesBuffer,
}
}

// Server at node broadcasts its blockchain status
action broadcastStatus(node) = all {
val server = bsyncServers.get(node)
val msg = server.syncStatus()
all {
bsyncServers' = bsyncServers.put(node, server),
statusBuffer' = broadcastStatusMsg(statusBuffer, msg),
requestsBuffer' = requestsBuffer,
responsesBuffer' = responsesBuffer,
}
}

// Server at node takes a step (checking for requests and responding)
action stepServer(node) = all {
val server = bsyncServers.get(node)
val result = server.syncServer()
all {
bsyncServers' = bsyncServers.put(node, server),
statusBuffer' = statusBuffer,
requestsBuffer' = requestsBuffer,
responsesBuffer' = match result.msg {
| Some(m) => responsesBuffer.sendResponse(m)
| None => responsesBuffer
},
}
}

// Updates the server status, the latest available blockchain content.
// This action must be called by the component that knows the chain.
action updateServer(node, chain) = all {
val server = bsyncServers.get(node)
all {
chain != server.chain,
bsyncServers' = bsyncServers.put(node, { ...server, chain: chain }),
unchangedBsync,
}
}

}
254 changes: 254 additions & 0 deletions specs/quint/specs/blocksync/sync.qnt
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// -*- mode: Bluespec; -*-

module sync {

// General definitions
import blocksync.* from "./blocksync"

import statemachineAsync(
validators = Set("v1", "v2", "v3", "v4"),
validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1),
Faulty = Set("v1"),
Values = Set("red", "blue"),
Rounds = Set(0, 1, 2, 3),
Heights = Set(0) // , 1, 2, 3)
).* from "../statemachineAsync"

import bsync_client.* from "./client"
import bsync_server.* from "./server"

// ****************************************************************************
// State machine
// ****************************************************************************
//
// The statemachine is put on top of statemachineAsync, that is, we use its
// initialization and steps, and add the updates to the variables defined below
//

/// initializing the variables of the sync part of the state machine
action syncInit = all {
initClient(validators),
initServer(validators),
initBsync(validators),
}

action syncUnchangedAll = all {
unchangedServer,
unchangedClient,
unchangedBsync,
}

/// initialize consensus and synchronizer
action initAll = all {
init,
syncInit
}

//
// Actions for the Environment to send a node to a new height
//

/// environment sends the node to the next height.
/// initializes synchronizer
action newHeightActionSync(v, valset, h) = all {
newHeightClient(v, h),
unchangedBsync,
unchangedServer,
}

/// environment sends the node to the next height.
action newHeightActionAll(v, valset, h) = all {
newHeightActionSync(v, valset, h),
newHeightAction(v, valset, h),
}

//
// Actions for process steps in the sync protocol
//

// Update server v from the consensus' blockchain
action syncUpdateServer(v) = all {
val chain = system.get(v).es.chain
all {
updateServer(v, chain),
unchangedClient,
unchangedAll,
}
}

// Server v announces its status
action syncStatusStep(v) = all {
all {
broadcastStatus(v),
unchangedClient,
unchangedAll,
}
}

// Server v takes a step (checking for requests and responding)
action syncStepServer(v) = all {
all {
stepServer(v),
unchangedClient,
unchangedAll,
}
}

// Client v takes a step
//
// FIXME: here we need to apply the client output to the consensus
// state machine. Refactoring this method is much more complex.
//
action syncStepClient(v) = all {
val result = bsyncClient(bsyncClients.get(v))
all {
// TODO: this block should be implemented in client.qnt
bsyncClients' = bsyncClients.put(v, result.sync),
statusBuffer' = statusBuffer,
requestsBuffer' = match result.req {
| Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m)))
| None => requestsBuffer
},
responsesBuffer' = responsesBuffer,

unchangedServer,

// This is the part that interacts with the consensus state
// machine. Moreover, the logic's code should be here, but due
// to Quint issues we need to keep it in stateMachineAsync.
putSyncOutputIntoNode(v, result.so),
}
}

//
// Actions for the environment to deliver messages in the sync protocol
// Implemented by the blocksync server or client.
//

// deliver a status message to client v
action syncDeliverStatus(v) = all {
deliverStatus(v),
unchangedServer,
unchangedAll
}

// deliver a response to client v
action syncDeliverResp(v) = all {
deliverResponse(v),
unchangedServer,
unchangedAll
}

// deliver a request to server v
action syncDeliverReq(v) = all {
deliverRequest(v),
unchangedClient,
unchangedAll
}

//
// Complex actions for a validator: consensus and blocksync
//

// validator step in the system with sync protocol
action syncValStep(v) = all {
valStep(v),
syncUnchangedAll
}

/// main step function: either a consensus state-machine step or a sync protocol step
action stepWithBlockSync = any {
all {
step,
syncUnchangedAll
},
nondet v = oneOf(Correct)
any {
syncDeliverReq(v),
syncDeliverResp(v),
syncDeliverStatus(v),
syncUpdateServer(v), // update the server with the latest state of the chain
syncStepServer(v), // looking for a request and sends responses
syncStepClient(v), // is checking if there are responses and check whether it need to requ
syncStatusStep(v),
//syncTimeout(v) // TODO:
}
}

//
// Interesting scenarios
//

/// auxiliary function for initHeight
/// sets the chain
pure def setChain(s: DriverState, c: List[{decision: Proposal, commit: Set[Vote]}]): DriverState =
{... s, chain: c}

/// auxiliary function for initHeight
/// constructs a commit certificate for a height and value
pure def commitSet (h: Height, v: Value) : Set[Vote] =
Set("v1", "v2", "v3").map(n => mkVote(Precommit, n, h, 0, v))

/// An action to set up an initial state with some nodes already decided up to height h
/// this sets up an initial state where v4 starts late, and v2 and v3 have reached
/// height h
action initHeight(h) = all {
val special = "v4" // TODO proper selection from correct set
val initsystem = Correct.mapBy(v =>
// hack
if (v == special) initNode(v, validatorSet, 0)
else initNode(v, validatorSet, h))
nondet decisionValMap = 0.to(h-1).setOfMaps(Values).oneOf()
val propMap = decisionValMap.keys().mapBy(i =>
mkProposal( proposer(validatorSet, i,0),
i,
0,
decisionValMap.get(i),
0))
val list = range(0, h).foldl(List(), (acc, i) => acc.append(propMap.get(i)))
val chain = list.foldl(List(), (acc, i) => acc.append({decision: i, commit: commitSet(i.height, Val(i.proposal))}))
all {
system' = initsystem.keys().mapBy(x =>
// hack
if (x == special) initsystem.get(x)
else {... initsystem.get(x), es: setChain(initsystem.get(x).es, chain) }),
propBuffer' = Correct.mapBy(v => Set()),
voteBuffer' = Correct.mapBy(v => Set()),
certBuffer' = Correct.mapBy(v => Set()),
_hist' = { validator: "INIT", input: NoDInput, output: NoConsensusOutput },
syncInit
}
}

/// a simple scenario where v4 starts height h
run syncCycle(h) =
newHeightActionAll("v4", validatorSet, h)
.then(syncStatusStep("v2"))
.then(syncDeliverStatus("v4"))
.then(syncStepClient("v4")) // ask for certificate
.then(syncDeliverReq("v2"))
.then(syncStepServer("v2"))
.then(syncDeliverResp("v4"))
.then(syncStepClient("v4")) // ask for block and give certificate to node
.expect(system.get("v4").incomingSyncCertificates.size() > 0)
.then(syncDeliverReq("v2"))
.then(syncStepServer("v2"))
.then(syncDeliverResp("v4"))
.then(syncStepClient("v4"))
.expect(system.get("v4").incomingSyncProposals.size() > 0)
.then(3.reps(_ => syncValStep("v4")))
.expect(system.get("v4").es.chain.length() > h)

run retreat =
nondet heightToReach = 1.to(4).oneOf()
initHeight( q::debug ("Height:", heightToReach))
// FIXME: I had to put it here, instead of syncCycle(h)
// This is not ideal, but it works.
.then(syncUpdateServer("v2"))
.then(heightToReach.reps(i => syncCycle(i)))
.expect(system.get("v4").es.chain == system.get("v2").es.chain)
.then(newHeightActionAll("v4", validatorSet, heightToReach))
.expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height)
// and now v4 has synced !

}
5 changes: 0 additions & 5 deletions specs/quint/specs/driver.qnt
Original file line number Diff line number Diff line change
@@ -11,11 +11,6 @@ module driver {
// State
// *************************************************************************

type BlockStoreEntry = {
decision: Proposal,
commit: Set[Vote]
}

type DriverState = {
bk: Bookkeeper,
cs: ConsensusState,
545 changes: 0 additions & 545 deletions specs/quint/specs/sync.qnt

This file was deleted.

7 changes: 7 additions & 0 deletions specs/quint/specs/types.qnt
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ module types {
| Val(v) => v
| Nil => "getVal error" // this should not happen
}

pure def isValid(value: NonNilValue): bool =
// for simulation, if the value is "invalid", so is the proposal
// if this is to become "non-deterministic", we must push it
@@ -84,4 +85,10 @@ module types {
| Commit(Set[Vote])


// Committed blocks store
type BlockStoreEntry = {
decision: Proposal,
commit: Set[Vote]
}

}