Skip to content

Commit

Permalink
[Keystone][Deployments] Make adding NOPs and DONs idempotent (#15163)
Browse files Browse the repository at this point in the history
Similarly to adding capabilities or nodes, don't add NOPs or DONs that already exist.
  • Loading branch information
bolekk authored Nov 8, 2024
1 parent 4bec5f4 commit 45db6b7
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 31 deletions.
2 changes: 1 addition & 1 deletion deployment/keystone/changeset/internal/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func deployCapReg(t *testing.T, lggr logger.Logger, chain deployment.Chain) *kcr
}

func addNops(t *testing.T, lggr logger.Logger, chain deployment.Chain, registry *kcr.CapabilitiesRegistry, nops []kcr.CapabilitiesRegistryNodeOperator) *kslib.RegisterNOPSResponse {
resp, err := kslib.RegisterNOPS(context.TODO(), kslib.RegisterNOPSRequest{
resp, err := kslib.RegisterNOPS(context.TODO(), lggr, kslib.RegisterNOPSRequest{
Chain: chain,
Registry: registry,
Nops: nops,
Expand Down
110 changes: 84 additions & 26 deletions deployment/keystone/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
for _, nop := range nodeIdToNop {
nops = append(nops, nop)
}
nopsResp, err := RegisterNOPS(ctx, RegisterNOPSRequest{
nopsResp, err := RegisterNOPS(ctx, lggr, RegisterNOPSRequest{
Chain: registryChain,
Registry: registry,
Nops: nops,
Expand Down Expand Up @@ -231,7 +231,7 @@ func ConfigureRegistry(ctx context.Context, lggr logger.Logger, req ConfigureCon
if err != nil {
return nil, fmt.Errorf("failed to register DONS: %w", err)
}
lggr.Infow("registered DONS", "dons", len(donsResp.donInfos))
lggr.Infow("registered DONs", "dons", len(donsResp.donInfos))

return &ConfigureContractsResponse{
Changeset: &deployment.ChangesetOutput{
Expand Down Expand Up @@ -371,6 +371,7 @@ func registerCapabilities(lggr logger.Logger, req registerCapabilitiesRequest) (
if len(req.donToCapabilities) == 0 {
return nil, fmt.Errorf("no capabilities to register")
}
lggr.Infow("registering capabilities...", "len", len(req.donToCapabilities))
resp := &registerCapabilitiesResponse{
donToCapabilities: make(map[string][]RegisteredCapability),
}
Expand Down Expand Up @@ -421,8 +422,37 @@ type RegisterNOPSResponse struct {
Nops []*kcr.CapabilitiesRegistryNodeOperatorAdded
}

func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
nops := req.Nops
func RegisterNOPS(ctx context.Context, lggr logger.Logger, req RegisterNOPSRequest) (*RegisterNOPSResponse, error) {
lggr.Infow("registering node operators...", "len", len(req.Nops))
existingNops, err := req.Registry.GetNodeOperators(&bind.CallOpts{})
if err != nil {
return nil, err
}
existingNopsAddrToID := make(map[capabilities_registry.CapabilitiesRegistryNodeOperator]uint32)
for id, nop := range existingNops {
existingNopsAddrToID[nop] = uint32(id)
}
lggr.Infow("fetched existing node operators", "len", len(existingNopsAddrToID))
resp := &RegisterNOPSResponse{
Nops: []*kcr.CapabilitiesRegistryNodeOperatorAdded{},
}
nops := []kcr.CapabilitiesRegistryNodeOperator{}
for _, nop := range req.Nops {
if id, ok := existingNopsAddrToID[nop]; !ok {
nops = append(nops, nop)
} else {
lggr.Debugw("node operator already exists", "name", nop.Name, "admin", nop.Admin.String(), "id", id)
resp.Nops = append(resp.Nops, &kcr.CapabilitiesRegistryNodeOperatorAdded{
NodeOperatorId: id,
Name: nop.Name,
Admin: nop.Admin,
})
}
}
if len(nops) == 0 {
lggr.Debug("no new node operators to register")
return resp, nil
}
tx, err := req.Registry.AddNodeOperators(req.Chain.DeployerKey, nops)
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
Expand All @@ -442,15 +472,12 @@ func RegisterNOPS(ctx context.Context, req RegisterNOPSRequest) (*RegisterNOPSRe
if len(receipt.Logs) != len(nops) {
return nil, fmt.Errorf("expected %d log entries for AddNodeOperators, got %d", len(nops), len(receipt.Logs))
}
resp := &RegisterNOPSResponse{
Nops: make([]*kcr.CapabilitiesRegistryNodeOperatorAdded, len(receipt.Logs)),
}
for i, log := range receipt.Logs {
o, err := req.Registry.ParseNodeOperatorAdded(*log)
if err != nil {
return nil, fmt.Errorf("failed to parse log %d for operator added: %w", i, err)
}
resp.Nops[i] = o
resp.Nops = append(resp.Nops, o)
}

return resp, nil
Expand Down Expand Up @@ -531,6 +558,7 @@ type registerNodesResponse struct {
// can sign the transactions update the contract state
// TODO: 467 refactor to support MCMS. Specifically need to separate the call data generation from the actual contract call
func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNodesResponse, error) {
lggr.Infow("registering nodes...", "len", len(req.nodeIdToNop))
nopToNodeIDs := make(map[kcr.CapabilitiesRegistryNodeOperator][]string)
for nodeID, nop := range req.nodeIdToNop {
if _, ok := nopToNodeIDs[nop]; !ok {
Expand Down Expand Up @@ -623,7 +651,7 @@ func registerNodes(lggr logger.Logger, req *registerNodesRequest) (*registerNode
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
if strings.Contains(err.Error(), "NodeAlreadyExists") {
lggr.Warnw("node already exists, skipping", "p2pid", singleNodeParams.P2pId)
lggr.Warnw("node already exists, skipping", "p2pid", hex.EncodeToString(singleNodeParams.P2pId[:]))
continue
}
return nil, fmt.Errorf("failed to call AddNode for node with p2pid %v: %w", singleNodeParams.P2pId, err)
Expand Down Expand Up @@ -672,13 +700,22 @@ func sortedHash(p2pids [][32]byte) string {
}

func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsResponse, error) {
resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
lggr.Infow("registering DONs...", "len", len(req.donToOcr2Nodes))
// track hash of sorted p2pids to don name because the registry return value does not include the don name
// and we need to map it back to the don name to access the other mapping data such as the don's capabilities & nodes
p2pIdsToDon := make(map[string]string)
var registeredDons = 0
var addedDons = 0

donInfos, err := req.registry.GetDONs(&bind.CallOpts{})
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
existingDONs := make(map[string]struct{})
for _, donInfo := range donInfos {
existingDONs[sortedHash(donInfo.NodeP2PIds)] = struct{}{}
}
lggr.Infow("fetched existing DONs...", "len", len(donInfos), "lenByNodesHash", len(existingDONs))

for don, ocr2nodes := range req.donToOcr2Nodes {
var p2pIds [][32]byte
Expand All @@ -695,6 +732,12 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes

p2pSortedHash := sortedHash(p2pIds)
p2pIdsToDon[p2pSortedHash] = don

if _, ok := existingDONs[p2pSortedHash]; ok {
lggr.Debugw("don already exists, ignoring", "don", don, "p2p sorted hash", p2pSortedHash)
continue
}

caps, ok := req.donToCapabilities[don]
if !ok {
return nil, fmt.Errorf("capabilities not found for node operator %s", don)
Expand Down Expand Up @@ -728,44 +771,59 @@ func registerDons(lggr logger.Logger, req registerDonsRequest) (*registerDonsRes
return nil, fmt.Errorf("failed to confirm AddDON transaction %s for don %s: %w", tx.Hash().String(), don, err)
}
lggr.Debugw("registered DON", "don", don, "p2p sorted hash", p2pSortedHash, "cgs", cfgs, "wfSupported", wfSupported, "f", f)
registeredDons++
addedDons++
}
lggr.Debugf("Registered all DONS %d, waiting for registry to update", registeredDons)
lggr.Debugf("Registered all DONs (new=%d), waiting for registry to update", addedDons)

// occasionally the registry does not return the expected number of DONS immediately after the txns above
// so we retry a few times. while crude, it is effective
var donInfos []capabilities_registry.CapabilitiesRegistryDONInfo
var err error
foundAll := false
for i := 0; i < 10; i++ {
lggr.Debug("attempting to get DONS from registry", i)
lggr.Debugw("attempting to get DONs from registry", "attempt#", i)
donInfos, err = req.registry.GetDONs(&bind.CallOpts{})
if len(donInfos) != registeredDons {
lggr.Debugw("expected dons not registered", "expected", registeredDons, "got", len(donInfos))
if !containsAllDONs(donInfos, p2pIdsToDon) {
lggr.Debugw("some expected dons not registered yet, re-checking after a delay ...")
time.Sleep(2 * time.Second)
} else {
foundAll = true
break
}
}
if err != nil {
err = DecodeErr(kcr.CapabilitiesRegistryABI, err)
return nil, fmt.Errorf("failed to call GetDONs: %w", err)
}
if !foundAll {
return nil, fmt.Errorf("did not find all desired DONS")
}

resp := registerDonsResponse{
donInfos: make(map[string]kcr.CapabilitiesRegistryDONInfo),
}
for i, donInfo := range donInfos {
donName, ok := p2pIdsToDon[sortedHash(donInfo.NodeP2PIds)]
if !ok {
return nil, fmt.Errorf("don not found for p2pids %s in %v", sortedHash(donInfo.NodeP2PIds), p2pIdsToDon)
lggr.Debugw("irrelevant DON found in the registry, ignoring", "p2p sorted hash", sortedHash(donInfo.NodeP2PIds))
continue
}
lggr.Debugw("adding don info", "don", donName, "cnt", i)
lggr.Debugw("adding don info to the reponse (keyed by DON name)", "don", donName)
resp.donInfos[donName] = donInfos[i]
}
lggr.Debugw("found registered DONs", "count", len(resp.donInfos))
if len(resp.donInfos) != registeredDons {
return nil, fmt.Errorf("expected %d dons, got %d", registeredDons, len(resp.donInfos))
}
return &resp, nil
}

// are all DONs from p2pIdsToDon in donInfos
func containsAllDONs(donInfos []kcr.CapabilitiesRegistryDONInfo, p2pIdsToDon map[string]string) bool {
found := make(map[string]struct{})
for _, donInfo := range donInfos {
hash := sortedHash(donInfo.NodeP2PIds)
if _, ok := p2pIdsToDon[hash]; ok {
found[hash] = struct{}{}
}
}
return len(found) == len(p2pIdsToDon)
}

// configureForwarder sets the config for the forwarder contract on the chain for all Dons that accept workflows
// dons that don't accept workflows are not registered with the forwarder
func configureForwarder(lggr logger.Logger, chain deployment.Chain, fwdr *kf.KeystoneForwarder, dons []RegisteredDon) error {
Expand Down
8 changes: 4 additions & 4 deletions deployment/keystone/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ type DonCapabilities struct {
}

// map the node id to the NOP
func (dc DonCapabilities) nodeIdToNop(cs uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
func (dc DonCapabilities) nopsByNodeID(chainSelector uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, nop := range dc.Nops {
for _, node := range nop.Nodes {
a, err := AdminAddress(node, cs)
a, err := AdminAddress(node, chainSelector)
if err != nil {
return nil, fmt.Errorf("failed to get admin address for node %s: %w", node.ID, err)
}
out[node.ID] = NodeOperator(dc.Name, a)
out[node.ID] = NodeOperator(nop.Name, a)

}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func AdminAddress(n *models.Node, chainSel uint64) (string, error) {
func nodesToNops(dons []DonCapabilities, chainSel uint64) (map[string]capabilities_registry.CapabilitiesRegistryNodeOperator, error) {
out := make(map[string]capabilities_registry.CapabilitiesRegistryNodeOperator)
for _, don := range dons {
nops, err := don.nodeIdToNop(chainSel)
nops, err := don.nopsByNodeID(chainSel)
if err != nil {
return nil, fmt.Errorf("failed to get registry NOPs for don %s: %w", don.Name, err)
}
Expand Down

0 comments on commit 45db6b7

Please sign in to comment.