Skip to content

Commit

Permalink
[Keystone][Deployments] Make adding NOPs and DONs idempotent
Browse files Browse the repository at this point in the history
Similarly to adding capabilities or nodes, don't add NOPs or DONs that already exist.
  • Loading branch information
bolekk committed Nov 8, 2024
1 parent 4bec5f4 commit 939f9f6
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 31 deletions.
2 changes: 1 addition & 1 deletion deployment/keystone/changeset/internal/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func deployCapReg(t *testing.T, lggr logger.Logger, chain deployment.Chain) *kcr
}

func addNops(t *testing.T, lggr logger.Logger, chain deployment.Chain, registry *kcr.CapabilitiesRegistry, nops []kcr.CapabilitiesRegistryNodeOperator) *kslib.RegisterNOPSResponse {
resp, err := kslib.RegisterNOPS(context.TODO(), kslib.RegisterNOPSRequest{
resp, err := kslib.RegisterNOPS(context.TODO(), lggr, kslib.RegisterNOPSRequest{
Chain: chain,
Registry: registry,
Nops: nops,
Expand Down
110 changes: 84 additions & 26 deletions deployment/keystone/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
for _, nop := range nodeIdToNop {
nops = append(nops, nop)
}
nopsResp, err := RegisterNOPS(ctx, RegisterNOPSRequest{
nopsResp, err := RegisterNOPS(ctx, lggr, RegisterNOPSRequest{
Chain: registryChain,
Registry: registry,
Nops: nops,
Expand Down Expand Up @@ -231,7 +231,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
if err != nil {
return nil, fmt.Errorf("failed to register DONS: %w", err)
}
lggr.Infow("registered DONS", "dons", len(donsResp.donInfos))
lggr.Infow("registered DONs", "dons", len(donsResp.donInfos))

return &ConfigureContractsResponse{
Changeset: &deployment.ChangesetOutput{
Expand Down Expand Up @@ -371,6 +371,7 @@ func registerCapabilities(lggr logger.Logger, req registerCapabilitiesRequest) (
if len(req.donToCapabilities) == 0 {
return nil, fmt.Errorf("no capabilities to register")
}
lggr.Infow("registering capabilities...", "len", len(req.donToCapabilities))
resp := &registerCapabilitiesResponse{
donToCapabilities: make(map[string][]RegisteredCapability),
}
Expand Down Expand Up @@ -421,8 +422,37 @@ type RegisterNOPSResponse struct {
Nops []*kcr.CapabilitiesRegistryNodeOperatorAdded
}

func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
nops := req.Nops
func RegisterNOPS(ctx context.Context, lggr logger.Logger, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
lggr.Infow("registering node operators...", "len", len(req.Nops))
existingNops, err := req.Registry.GetNodeOperators(&bind.CallOpts{})
if err != nil {
return nil, err
}
existingNopsAddrToID := make(map[capabilities_registry.CapabilitiesRegistryNodeOperator]uint32)
for id, nop := range existingNops {
existingNopsAddrToID[nop] = uint32(id)
}
lggr.Infow("fetched existing node operators", "len", len(existingNopsAddrToID))
resp := &RegisterNOPSResponse{
Nops: []*kcr.CapabilitiesRegistryNodeOperatorAdded{},
}
nops := []kcr.CapabilitiesRegistryNodeOperator{}
for _, nop := range req.Nops {
if id, ok := existingNopsAddrToID[nop]; !ok {
nops = append(nops, nop)
} else {
lggr.Debugw("node operator already exists", "name", nop.Name, "admin", nop.Admin.String(), "id", id)
resp.Nops = append(resp.Nops, &kcr.CapabilitiesRegistryNodeOperatorAdded{
NodeOperatorId: id,
Name: nop.Name,
Admin: nop.Admin,
})
}
}
if len(nops) == 0 {
lggr.Debug("no new node operators to register")
return resp, nil
}
tx, err := req.Registry.AddNodeOperators(req.Chain.DeployerKey, nops)
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
Expand All @@ -442,15 +472,12 @@ func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSRe
if len(receipt.Logs) != len(nops) {
return nil, fmt.Errorf("expected %d log entries for AddNodeOperators, got %d", len(nops), len(receipt.Logs))
}
resp := &RegisterNOPSResponse{
Nops: make([]*kcr.CapabilitiesRegistryNodeOperatorAdded, len(receipt.Logs)),
}
for i, log := range receipt.Logs {
o, err := req.Registry.ParseNodeOperatorAdded(*log)
if err != nil {
return nil, fmt.Errorf("failed to parse log %d for operator added: %w", i, err)
}
resp.Nops[i] = o
resp.Nops = append(resp.Nops, o)
}

return resp, nil
Expand Down Expand Up @@ -531,6 +558,7 @@ type registerNodesResponse struct {
// can sign the transactions update the contract state
// TODO: 467 refactor to support MCMS. Specifically need to separate the call data generation from the actual contract call
func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNodesResponse, error) {
lggr.Infow("registering nodes...", "len", len(req.nodeIdToNop))
nopToNodeIDs := make(map[kcr.CapabilitiesRegistryNodeOperator][]string)
for nodeID, nop := range req.nodeIdToNop {
if _, ok := nopToNodeIDs[nop]; !ok {
Expand Down Expand Up @@ -623,7 +651,7 @@ func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNode
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
if strings.Contains(err.Error(), "NodeAlreadyExists") {
lggr.Warnw("node already exists, skipping", "p2pid", singleNodeParams.P2pId)
lggr.Warnw("node already exists, skipping", "p2pid", hex.EncodeToString(singleNodeParams.P2pId[:]))
continue
}
return nil, fmt.Errorf("failed to call AddNode for node with p2pid %v: %w", singleNodeParams.P2pId, err)
Expand Down Expand Up @@ -672,13 +700,22 @@ func sortedHash(p2pids [][32]byte) string {
}

func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsResponse, error) {
resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
lggr.Infow("registering DONs...", "len", len(req.donToOcr2Nodes))
// track hash of sorted p2pids to don name because the registry return value does not include the don name
// and we need to map it back to the don name to access the other mapping data such as the don's capabilities & nodes
p2pIdsToDon := make(map[string]string)
var registeredDons = 0
var addedDons = 0

donInfos, err := req.registry.GetDONs(&bind.CallOpts{})
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
existingDONs := make(map[string]struct{})
for _, donInfo := range donInfos {
existingDONs[sortedHash(donInfo.NodeP2PIds)] = struct{}{}
}
lggr.Infow("fetched existing DONs...", "len", len(donInfos), "lenByNodesHash", len(existingDONs))

for don, ocr2nodes := range req.donToOcr2Nodes {
var p2pIds [][32]byte
Expand All @@ -695,6 +732,12 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes

p2pSortedHash := sortedHash(p2pIds)
p2pIdsToDon[p2pSortedHash] = don

if _, ok := existingDONs[p2pSortedHash]; ok {
lggr.Debugw("don already exists, ignoring", "don", don, "p2p sorted hash", p2pSortedHash)
continue
}

caps, ok := req.donToCapabilities[don]
if !ok {
return nil, fmt.Errorf("capabilities not found for node operator %s", don)
Expand Down Expand Up @@ -728,44 +771,59 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes
return nil, fmt.Errorf("failed to confirm AddDON transaction %s for don %s: %w", tx.Hash().String(), don, err)
}
lggr.Debugw("registered DON", "don", don, "p2p sorted hash", p2pSortedHash, "cgs", cfgs, "wfSupported", wfSupported, "f", f)
registeredDons++
addedDons++
}
lggr.Debugf("Registered all DONS %d, waiting for registry to update", registeredDons)
lggr.Debugf("Registered all DONs (new=%d), waiting for registry to update", addedDons)

// occasionally the registry does not return the expected number of DONS immediately after the txns above
// so we retry a few times. while crude, it is effective
var donInfos []capabilities_registry.CapabilitiesRegistryDONInfo
var err error
foundAll := false
for i := 0; i < 10; i++ {
lggr.Debug("attempting to get DONS from registry", i)
lggr.Debugw("attempting to get DONs from registry", "attempt#", i)
donInfos, err = req.registry.GetDONs(&bind.CallOpts{})
if len(donInfos) != registeredDons {
lggr.Debugw("expected dons not registered", "expected", registeredDons, "got", len(donInfos))
if !containsAllDONs(donInfos, p2pIdsToDon) {
lggr.Debugw("some expected dons not registered yet, re-checking after a delay ...")
time.Sleep(2 * time.Second)
} else {
foundAll = true
break
}
}
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
if !foundAll {
return nil, fmt.Errorf("did not find all desired DONS")
}

resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
for i, donInfo := range donInfos {
donName, ok := p2pIdsToDon[sortedHash(donInfo.NodeP2PIds)]
if !ok {
return nil, fmt.Errorf("don not found for p2pids %s in %v", sortedHash(donInfo.NodeP2PIds), p2pIdsToDon)
lggr.Debugw("irrelevant DON found in the registry, ignoring", "p2p sorted hash", sortedHash(donInfo.NodeP2PIds))
continue
}
lggr.Debugw("adding don info", "don", donName, "cnt", i)
lggr.Debugw("adding don info to the reponse (keyed by DON name)", "don", donName)
resp.donInfos[donName] = donInfos[i]
}
lggr.Debugw("found registered DONs", "count", len(resp.donInfos))
if len(resp.donInfos) != registeredDons {
return nil, fmt.Errorf("expected %d dons, got %d", registeredDons, len(resp.donInfos))
}
return &resp, nil
}

// are all DONs from p2pIdsToDon in donInfos
func containsAllDONs(donInfos []kcr.CapabilitiesRegistryDONInfo, p2pIdsToDon map[string]string) bool {
found := make(map[string]struct{})
for _, donInfo := range donInfos {
hash := sortedHash(donInfo.NodeP2PIds)
if _, ok := p2pIdsToDon[hash]; ok {
found[hash] = struct{}{}
}
}
return len(found) == len(p2pIdsToDon)
}

// configureForwarder sets the config for the forwarder contract on the chain for all Dons that accept workflows
// dons that don't accept workflows are not registered with the forwarder
func configureForwarder(lggr logger.Logger, chain deployment.Chain, fwdr *kf.KeystoneForwarder, dons []RegisteredDon) error {
Expand Down
8 changes: 4 additions & 4 deletions deployment/keystone/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ type DonCapabilities struct {
}

// map the node id to the NOP
func (dc DonCapabilities) nodeIdToNop(cs uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
func (dc DonCapabilities) nopsByNodeID(chainSelector uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, nop := range dc.Nops {
for _, node := range nop.Nodes {
a, err := AdminAddress(node, cs)
a, err := AdminAddress(node, chainSelector)
if err != nil {
return nil, fmt.Errorf("failed to get admin address for node %s: %w", node.ID, err)
}
out[node.ID] = NodeOperator(dc.Name, a)
out[node.ID] = NodeOperator(nop.Name, a)

}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func AdminAddress(n *models.Node, chainSel uint64) (string, error) {
func nodesToNops(dons []DonCapabilities, chainSel uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, don := range dons {
nops, err := don.nodeIdToNop(chainSel)
nops, err := don.nopsByNodeID(chainSel)
if err != nil {
return nil, fmt.Errorf("failed to get registry NOPs for don %s: %w", don.Name, err)
}
Expand Down

0 comments on commit 939f9f6

Please sign in to comment.