Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(quint): refactor the blocksync spec. #545

Merged
merged 40 commits into from
Nov 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e20871f
Start Blocksync
josef-widder Oct 8, 2024
6f3c593
new Height enabled. needs to be tested
josef-widder Oct 8, 2024
2dcaae4
blocksync fuctions done
josef-widder Oct 9, 2024
0f0e8f7
it is moving
josef-widder Oct 10, 2024
9b934ab
Daniel told me to push
josef-widder Oct 10, 2024
5f62609
syncing on the lucky path
josef-widder Oct 10, 2024
0118eb3
syncing
josef-widder Oct 10, 2024
ca13638
moved definitions a bit around
josef-widder Oct 30, 2024
c575052
code cleanup
josef-widder Oct 31, 2024
75b2c18
more cleanup
josef-widder Oct 31, 2024
b889cd0
polishing
josef-widder Oct 31, 2024
c6258da
Merge branch 'main' into josef/blocksync-retreat
cason Nov 5, 2024
3f49d59
fix(quint): fix test units with new driver's methods and types (#524)
cason Nov 8, 2024
06a0af2
Merge branch 'main' into josef/blocksync-retreat
cason Nov 8, 2024
19d10d9
Apply suggestions from code review
cason Nov 11, 2024
7713010
Merge branch 'main' into josef/blocksync-retreat
cason Nov 11, 2024
1882dbb
spec/quint: create a blocksync directory
cason Nov 8, 2024
9a3e76e
spec/quint: refactored spec into 4 files
cason Nov 11, 2024
30d4187
spec/quint: spacing changes
cason Nov 11, 2024
606b99d
spec/quint: spacing changes again
cason Nov 11, 2024
56a65a9
spec/quint: TODOs to render Server self-contained
cason Nov 11, 2024
d03d5fd
feat(spec): Blocksync (#462)
josef-widder Nov 11, 2024
39f3cf9
Merge branch 'main' into cason/462-blocksync
cason Nov 11, 2024
e617e81
spec/quint: spacing changes again
cason Nov 11, 2024
a96f7cc
specs/quint: removed original sync.qnt
cason Nov 11, 2024
bb9c380
spec/quint: fixing blocksync/sync.qnt module
cason Nov 11, 2024
390dbf1
spec/quint: sync Server has an id : Address
cason Nov 11, 2024
eaa90e7
spec/quint: move BlockStoreEntry to types.qnt
cason Nov 11, 2024
bee0d60
specs/quint: blocksync Server keeps a chain copy
cason Nov 11, 2024
96cdf25
specs/quint: pleasing the linter
cason Nov 11, 2024
bc24ff1
spec/quint: updateServer is now an action
cason Nov 14, 2024
de2da35
Merge branch 'main' into cason/462-blocksync
cason Nov 14, 2024
98fd7a4
Merge branch 'main' into cason/462-blocksync
cason Nov 15, 2024
23c4c54
spec/quint: move client actions to client.qnt
cason Nov 15, 2024
b967216
spec/quint: rename sync client to bsync client
cason Nov 15, 2024
b7cbbca
spec/quint: server to bsync_server, with its actions
cason Nov 15, 2024
0eebb58
spec/quint: moved buffers to blocksync.qnt
cason Nov 15, 2024
8e5757e
quint/spec: refactoring of client and server methods
cason Nov 15, 2024
540b873
Merge branch 'main' into cason/462-blocksync
cason Nov 15, 2024
1e564b0
spec/quint: pleasing the linter
cason Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
blocksync fuctions done
josef-widder committed Oct 9, 2024

Verified

This commit was signed with the committer’s verified signature.
josef-widder Josef Widder
commit 2dcaae42b046d42d46bb97989145f905ded43567
4 changes: 4 additions & 0 deletions specs/quint/specs/driver.qnt
Original file line number Diff line number Diff line change
@@ -408,6 +408,8 @@ module driver {
incomingProposals: Set[Proposal],
incomingCertificates: Set[Set[Vote]],
// TODO: add buffers for sync
incomingSyncValues: Set[NonNilValue],
incomingSyncCertificates: Set[Set[Vote]],
getValueRequests: Set[(Height, Round)],
nextValueToPropose: Value,
}
@@ -418,6 +420,8 @@ module driver {
incomingVotes: Set(),
incomingProposals: Set(),
incomingCertificates: Set(),
incomingSyncProposals: Set(),
incomingSyncCertificates: Set(),
getValueRequests: Set(),
nextValueToPropose: Nil,
}
274 changes: 238 additions & 36 deletions specs/quint/specs/sync.qnt
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced by blocksync directory.

Original file line number Diff line number Diff line change
@@ -33,34 +33,68 @@ type RequestMsg = {
height: Height
}


type Response =
| RespBlock(NonNilValue)
| RespCertificate(Set[Vote])

type ResponseMsg = {
client: Address,
server: Address,
height: Height,
response: Response,
}



type SynchronizerOutput =
| SOCertificate(Set[Vote])
| SOBlock(Value)
| SOBlock(NonNilValue)
| SONoOutput

type Option[a] =
| Some(a)
| None

type Synchronizer = {
id: Address,
height: Height,
peers: Set[Address], // MVP: this is going to be the validator set for now
// so we use addresses. We might use peerID in the future
peerStatus: Address -> SyncStatus,
openRequests: Set[RequestMsg],
lastSyncedHeight: Height // done if greater than or equal to height
lastSyncedHeight: Height, // done if greater than or equal to height
// we could add buffers for certificates and values
statusMsgs: Set[SyncStatusMsg],
responses: Set[ResponseMsg],
}

pure def initSynchronizer(peers: Set[Address]) : Synchronizer =
type Server = {
inbuffer : Set[RequestMsg]
}

pure def initSynchronizer(id: Address, peers: Set[Address]) : Synchronizer =
{
id: id,
height: -1,
peers: peers,
peerStatus: peers.mapBy(x => {base:-1, top:-1}),
openRequests: Set(),
lastSyncedHeight: -1,
statusMsgs: Set(),
responses: Set(),
}

pure def updatePeerStatus (ps: Address -> SyncStatus, msgs: Set[SyncStatusMsg]) : Address -> SyncStatus =
msgs.fold(ps, (newStatus , msg) =>
if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base?
newStatus.put(msg.peer, {base: msg.base, top: msg.top})
else
newStatus
)

pure def initServer = {inbuffer : Set()}

pure def syncNewHeight (s: Synchronizer, h: Height) : Synchronizer =
if (h <= s.height) s
else
@@ -69,30 +103,36 @@ pure def syncNewHeight (s: Synchronizer, h: Height) : Synchronizer =

pure def syncStatus (s: NodeState) : SyncStatusMsg =
//TODO: Logic
{ peer: s.es.cs.p , base: 5, top: 7 }
// TODO: perhaps we should add to height to the chain entries to capture non-zero bases
{ peer: s.es.cs.p , base: 0, top: s.es.chain.length() - 1 }



// State machine

var syncSystem: Address -> Synchronizer
var serverSystem: Address -> Server

var statusBuffer : Address -> Set[SyncStatusMsg]
var syncValueBuffer : Address -> Set[Proposal]
var syncCertBuffer : Address -> Set[Set[Vote]]
var syncResponseBuffer : Address -> Set[ResponseMsg]
var syncRequestBuffer : Address -> Set[RequestMsg]

pure def sendStatus(buffer: Address -> Set[SyncStatusMsg], sm: SyncStatusMsg): Address -> Set[SyncStatusMsg] =
pure def broadcastStatus(buffer: Address -> Set[SyncStatusMsg], sm: SyncStatusMsg): Address -> Set[SyncStatusMsg] =
buffer.keys().mapBy(x => { ...buffer.get(x).union(Set(sm)) })

/// put the response message in the boffer of the client
pure def sendResponse(buffer: Address -> Set[ResponseMsg], m: ResponseMsg): Address -> Set[ResponseMsg] =
buffer.put(m.client, { ...buffer.get(m.client).union(Set(m)) })



action newHeightAction(v, valset, h) = all {
val ns = system.get(v).with("es", initDriver(v, valset, h))
system' = system.put(v, ns),
syncSystem' = syncSystem.put(v, syncNewHeight(syncSystem.get(v), h)),
// rest unchanged
statusBuffer' = statusBuffer,
syncValueBuffer' = syncValueBuffer,
syncCertBuffer' = syncCertBuffer,
syncResponseBuffer' = syncResponseBuffer,
_hist' = _hist,
propBuffer' = propBuffer,
voteBuffer' = voteBuffer,
@@ -102,50 +142,212 @@ action newHeightAction(v, valset, h) = all {
action syncInit = all {
init,
statusBuffer' = Correct.mapBy(v => Set()),
syncResponseBuffer' = Correct.mapBy(v => Set()),
syncRequestBuffer' = Correct.mapBy(v => Set()),
syncSystem' = Correct.mapBy(v => initSynchronizer(v, validators)),
serverSystem' = Correct.mapBy(v => initServer),
}

action syncUnchangedAll = all {
statusBuffer' = statusBuffer
statusBuffer' = statusBuffer,
syncResponseBuffer' = syncResponseBuffer,
syncSystem' = syncSystem,
serverSystem' = serverSystem,
syncRequestBuffer' = syncRequestBuffer,
}


pure def syncClient (s: Synchronizer) :
{sync: Synchronizer, so: SynchronizerOutput, req: Option[RequestMsg]} =
// TODO logic
{
sync: s,
so: SONoOutput,
req: None
}
type ClientResult = {
sync: Synchronizer,
so: SynchronizerOutput,
req: Option[RequestMsg]
}

pure def syncServer (s: Synchronizer, ns: NodeState) :
{sync: Synchronizer, block: Value, cert: Set[Set[Vote]]} =
// TODO logic
// check for requests and serve them
// coupling to node state unclear
{

// {
// height: Height,
// peers: Set[Address], // MVP: this is going to be the validator set for now
// // so we use addresses. We might use peerID in the future
// peerStatus: Address -> SyncStatus,
// openRequests: Set[RequestMsg],
// lastSyncedHeight: Height, // done if greater than or equal to height
// // we could add buffers for certificates and values
// statusMsgs: Set[SyncStatusMsg],
// responses: Set[ResponseMsg],
// }

pure def syncClient (s: Synchronizer) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)
if (s.lastSyncedHeight >= s.height)
// nothing to do
{ sync: {...s, peerStatus: newPeerStates},
so: SONoOutput,
req: None}
else
// if there is a peer with a nice height, I am sending a request if I haven't done so before
// if received response
// if it is a certificate -> (check cerifcate) ask for the block from same peer
// -> give the certificate to consensus
// if i receive a block -> (check block) give the block to consensus
// -> nothing (newheight comes from outside)
val goodPeers = s.peers.filter(p => s.peerStatus.get(p).base <= s.height
and s.height < s.peerStatus.get(p).top )
if (goodPeers.size() > 0)
if (s.openRequests.size() == 0)
// we start the sync "round" by asking for a certificate
val req = { client: s.id,
server: goodPeers.chooseSome(),
rtype: SyncCertificate,
height: s.height}
{ sync: {...s,
peerStatus: newPeerStates,
openRequests: s.openRequests.union(Set(req))},
so: SONoOutput,
req: Some(req)
}
else
// we issued a request before, let's see whether there is a response
if (s. responses.size()> 0)
val resp = s.responses.chooseSome() // in the future there might be parallelization
val output = match resp.response {
// to be given to the context // TODO: checks for heights, validity
| RespBlock(value) => SOBlock(value)
| RespCertificate(cert) => SOCertificate(cert)
}
// TODO: refactor into handleBlock and handleCertificate
val request = match resp.response {
// if we received a certificate, we need to ask for a block
| RespCertificate(cert) =>
val blockreq = { client: s.id,
server: goodPeers.chooseSome(),
rtype: SyncBlock,
height: s.height}
Some(blockreq)
| RespBlock(value) => None
}
val newOpen = match resp.response {
// if we received a certificate, we need to record the block request, otherwise, we are done
| RespCertificate(cert) =>
match request {
| Some(blockreq) => Set(blockreq)
| None => Set()
}
| RespBlock(value) => Set() // If we have parallelization we need to remove one element
}
val newLast = match resp.response {
// if we received a certificate, we need to record the block request, otherwise, we are done
| RespCertificate(cert) => s.lastSyncedHeight
| RespBlock(value) => s.height // blockheight
}
{ sync: {...s,
peerStatus: newPeerStates,
openRequests: newOpen,
lastSyncedHeight: newLast},
so: output,
req: request}
else
// I don't have response
// this might be timeout logic
{ sync: {...s, peerStatus: newPeerStates},
so: SONoOutput,
req: None}
else
// no peers
{ sync: {...s, peerStatus: newPeerStates},
so: SONoOutput,
req: None}



type ServerOutput = {
sync: Server,
msg: Option[ResponseMsg],
}



pure def syncServer (s: Server, ns: NodeState) : ServerOutput =
if (s.inbuffer.size() > 0)
val m = s.inbuffer.chooseSome() // TODO: fix
val result =
if (m.height < ns.es.chain.length())
match m.rtype {
| SyncCertificate =>
val cm = { client: m.client,
server: m.server,
height: m.height,
response: RespCertificate(ns.es.chain[m.height].commit)}
Some(cm)
| SyncBlock =>
val bl = { client: m.client,
server: m.server,
height: m.height,
response: RespBlock(ns.es.chain[m.height].decision)}
Some(bl)
}
else None
{ sync: { ...s, inbuffer: s.inbuffer.exclude(Set(m))},
msg: result}
else {
sync: s,
block: Val("1"),
cert: Set()
}
msg: None}


action syncStepServer(v) = all {
val result = syncServer(syncSystem.get(v), system.get(v)) // this is where the chains is passed
unchangedAll,
statusBuffer' = statusBuffer

action syncStepServer(v) = all {
val result = syncServer(serverSystem.get(v), system.get(v)) // this is where the chains is passed
all {
unchangedAll,
statusBuffer' = statusBuffer,
syncResponseBuffer' = match result.msg {
| Some(m) => syncResponseBuffer.sendResponse(m) // TODO: fix after broadcast
| None => syncResponseBuffer
},
syncSystem' = syncSystem,
serverSystem' = serverSystem.put(v, result.sync),
syncRequestBuffer' = syncRequestBuffer,
}
}


action syncStepClient(v) = all {
// TODO call syncClient and deliver the SO to the node
unchangedAll,
statusBuffer' = statusBuffer
val result = syncClient(syncSystem.get(v))
all {
system' = match result.so {
// Put the sync output into the consensus engine
| SOCertificate(cert) =>
system.put(v, {... system.get(v),
incomingSyncCertificates:
system.get(v).incomingCertificates.union(Set(cert))})
| SOBlock(value) =>
system.put(v,
{... system.get(v),
incomingSyncValues: system.get(v).incomingSyncValues.union(Set(value))})
| SONoOutput => system
},
propBuffer' = propBuffer,
voteBuffer' = voteBuffer,
certBuffer' = certBuffer,
_hist' = _hist,
statusBuffer' = statusBuffer,
syncResponseBuffer' = syncResponseBuffer,
syncSystem' = syncSystem.put(v, result.sync),
serverSystem' = serverSystem,
syncRequestBuffer' = match result.req {
| Some(m) => syncRequestBuffer.put(m.server, syncRequestBuffer.get(m.server).union(Set(m)))
| None => syncRequestBuffer
},
}
}


action syncStatusStep(v) = all {
val newStatus = system.get(v).syncStatus()
statusBuffer' = sendStatus(statusBuffer, newStatus),
statusBuffer' = broadcastStatus(statusBuffer, newStatus),
syncResponseBuffer' = syncResponseBuffer,
syncSystem' = syncSystem,
serverSystem' = serverSystem,
syncRequestBuffer' = syncRequestBuffer,
unchangedAll
}

@@ -160,7 +362,7 @@ action stepWithBlockSync = any {
//syncDeliverReq(v),
//syncDeliverResp(v),
syncStepServer(v), //looking for a request and sends responses
syncStepClient(v), // is checking if there are responses and check whether it need to requ
// syncStepClient(v), // is checking if there are responses and check whether it need to requ
syncStatusStep(v),
//syncTimeout(v)
}