Skip to content

Commit

Permalink
[Keystone] Launcher: don't fail if some capabilities are missing loca…
Browse files Browse the repository at this point in the history
…lly (#15174)

If the onchain registry has capabilities that are not enabled locally, we don't
want to fail everything but continue launching other capabilities that are available.
  • Loading branch information
bolekk authored Nov 11, 2024
1 parent b984ea9 commit 9ac601d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
24 changes: 13 additions & 11 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// - remote capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims.
// Capability DONs (with IsPublic = true) the current node is a part of.
// These need server-side shims to expose my own capabilities externally.
myCapabilityDONs := []registrysyncer.DON{}
remoteCapabilityDONs := []registrysyncer.DON{}
for _, d := range publicDONs {
Expand Down Expand Up @@ -223,11 +223,11 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist
}
}

// Finally, if I'm a capability DON, let's enable external access
// Finally, if I'm in a capability DON, let's enable external access
// to the capability.
if len(myCapabilityDONs) > 0 {
for _, mcd := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, mcd, state, remoteWorkflowDONs)
for _, myDON := range myCapabilityDONs {
err := w.exposeCapabilities(ctx, myID, myDON, state, remoteWorkflowDONs)
if err != nil {
return err
}
Expand Down Expand Up @@ -395,10 +395,10 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

switch capability.CapabilityType {
case capabilities.CapabilityTypeTrigger:
newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTriggerPublisher := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
publisher := remote.NewTriggerPublisher(
capabilityConfig.RemoteTriggerConfig,
capability.(capabilities.TriggerCapability),
cap.(capabilities.TriggerCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -410,18 +410,19 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTriggerPublisher)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a trigger capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
case capabilities.CapabilityTypeAction:
w.lggr.Warn("no remote client configured for capability type action, skipping configuration")
case capabilities.CapabilityTypeConsensus:
w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration")
case capabilities.CapabilityTypeTarget:
newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
newTargetServer := func(cap capabilities.BaseCapability, info capabilities.CapabilityInfo) (remotetypes.ReceiverService, error) {
return target.NewServer(
capabilityConfig.RemoteTargetConfig,
myPeerID,
capability.(capabilities.TargetCapability),
cap.(capabilities.TargetCapability),
info,
don.DON,
idsToDONs,
Expand All @@ -433,7 +434,8 @@ func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.Pee

err := w.addReceiver(ctx, capability, don, newTargetServer)
if err != nil {
return fmt.Errorf("failed to add server-side receiver: %w", err)
w.lggr.Errorw("failed to add server-side receiver for a target capability - it won't be exposed remotely", "id", cid, "error", err)
// continue attempting other capabilities
}
default:
w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability)
Expand Down
20 changes: 14 additions & 6 deletions core/capabilities/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {

triggerCapID := randomWord()
targetCapID := randomWord()
// one capability from onchain registry is not set up locally
fullMissingTargetID := "[email protected]"
missingTargetCapID := randomWord()
dID := uint32(1)
// The below state describes a Workflow DON (AcceptsWorkflows = true),
// which exposes the streams-trigger and write_chain capabilities.
Expand All @@ -130,8 +133,9 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
Members: nodes,
},
CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{
fullTriggerCapID: {},
fullTargetID: {},
fullTriggerCapID: {},
fullTargetID: {},
fullMissingTargetID: {},
},
},
},
Expand All @@ -144,35 +148,39 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) {
ID: "[email protected]",
CapabilityType: capabilities.CapabilityTypeTarget,
},
fullMissingTargetID: {
ID: fullMissingTargetID,
CapabilityType: capabilities.CapabilityTypeTarget,
},
},
IDsToNodes: map[p2ptypes.PeerID]kcr.INodeInfoProviderNodeInfo{
nodes[0]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[0],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[1]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[1],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[2]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[2],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
nodes[3]: {
NodeOperatorId: 1,
Signer: randomWord(),
P2pId: nodes[3],
EncryptionPublicKey: randomWord(),
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID},
HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID, missingTargetCapID},
},
},
}
Expand Down
20 changes: 10 additions & 10 deletions core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *registrySyncer) updateStateLoop() {
}
}

func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, error) {
func (s *registrySyncer) importOnchainRegistry(ctx context.Context) (*LocalRegistry, error) {
caps := []kcr.CapabilitiesRegistryCapabilityInfo{}

err := s.reader.GetLatestValue(ctx, s.capabilitiesContract.ReadIdentifier("getCapabilities"), primitives.Unconfirmed, nil, &caps)
Expand Down Expand Up @@ -288,33 +288,33 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
s.reader = reader
}

var lr *LocalRegistry
var latestRegistry *LocalRegistry
var err error

if isInitialSync {
s.lggr.Debug("syncing with local registry")
lr, err = s.orm.LatestLocalRegistry(ctx)
latestRegistry, err = s.orm.LatestLocalRegistry(ctx)
if err != nil {
s.lggr.Warnw("failed to sync with local registry, using remote registry instead", "error", err)
} else {
lr.lggr = s.lggr
lr.getPeerID = s.getPeerID
latestRegistry.lggr = s.lggr
latestRegistry.getPeerID = s.getPeerID
}
}

if lr == nil {
if latestRegistry == nil {
s.lggr.Debug("syncing with remote registry")
localRegistry, err := s.localRegistry(ctx)
importedRegistry, err := s.importOnchainRegistry(ctx)
if err != nil {
return fmt.Errorf("failed to sync with remote registry: %w", err)
}
lr = localRegistry
latestRegistry = importedRegistry
// Attempt to send local registry to the update channel without blocking
// This is to prevent the tests from hanging if they are not calling `Start()` on the syncer
select {
case <-s.stopCh:
s.lggr.Debug("sync cancelled, stopping")
case s.updateChan <- lr:
case s.updateChan <- latestRegistry:
// Successfully sent state
s.lggr.Debug("remote registry update triggered successfully")
default:
Expand All @@ -324,7 +324,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
}

for _, h := range s.launchers {
lrCopy := deepCopyLocalRegistry(lr)
lrCopy := deepCopyLocalRegistry(latestRegistry)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
Expand Down

0 comments on commit 9ac601d

Please sign in to comment.