Skip to content

Commit

Permalink
Rework ER/T terminator management based on SDK terminator management …
Browse files Browse the repository at this point in the history
…code
  • Loading branch information
plorenz committed Feb 18, 2025
1 parent a7efbcd commit dee43ba
Show file tree
Hide file tree
Showing 36 changed files with 1,581 additions and 778 deletions.
1 change: 1 addition & 0 deletions common/inspect/controller_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ type ControllerInspectDetail struct {
Latency string `json:"latency"`
Version string `json:"version"`
TimeSinceLastContact string `json:"timeSinceLastContact"`
IsLeader bool `json:"isLeader"`
}
14 changes: 10 additions & 4 deletions common/inspect/terminator_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type SdkTerminatorInspectDetail struct {
AssignIds bool `json:"assignIds"`
V2 bool `json:"v2"`
SupportsInspect bool `json:"supportsInspect"`
OperationActive bool `json:"establishActive"`
OperationActive bool `json:"operationActive"`
CreateTime string `json:"createTime"`
LastAttempt string `json:"lastAttempt"`
}
Expand All @@ -55,7 +55,13 @@ type ErtTerminatorInspectResult struct {
}

type ErtTerminatorInspectDetail struct {
Key string `json:"key"`
Id string `json:"id"`
State string `json:"state"`
Key string `json:"key"`
Id string `json:"id"`
State string `json:"state"`
Instance string `json:"instance"`
Cost uint16 `json:"cost"`
Precedence string `json:"precedence"`
OperationActive bool `json:"operationActive"`
CreateTime string `json:"createTime"`
LastAttempt string `json:"lastAttempt"`
}
653 changes: 338 additions & 315 deletions common/pb/edge_ctrl_pb/edge_ctrl.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/pb/edge_ctrl_pb/edge_ctrl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ message CreateTunnelTerminatorRequestV2 {
message CreateTunnelTerminatorResponseV2 {
string terminatorId = 1;
int64 startTime = 2;
CreateTerminatorResult result = 3;
string msg = 4;
}

message UpdateTunnelTerminatorRequest {
Expand Down
79 changes: 78 additions & 1 deletion common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,96 @@ type Identity struct {
serviceSetIndex uint64
}

func (self *Identity) Equals(other *Identity) bool {
log := pfxlog.Logger().WithField("identity", self.identityIndex)
if self.Disabled != other.Disabled {
log.Info("identity updated, disabled flag changed")
return false
}

if self.Name != other.Name {
log.Info("identity updated, name changed")
return false
}

if string(self.AppDataJson) != string(other.AppDataJson) {
log.Info("identity updated, appDataJson changed")
return false
}

if self.DefaultHostingPrecedence != other.DefaultHostingPrecedence {
log.Info("identity updated, default hosting precedence changed")
return false
}

if self.DefaultHostingCost != other.DefaultHostingCost {
log.Info("identity updated, default hosting host changed")
return false
}

if len(self.ServiceHostingPrecedences) != len(other.ServiceHostingPrecedences) {
log.Info("identity updated, number of service hosting precedences changed")
return false
}

if len(self.ServiceHostingCosts) != len(other.ServiceHostingCosts) {
log.Info("identity updated, number of service hosting costs changed")
return false
}

for k, v := range self.ServiceHostingPrecedences {
v2, ok := other.ServiceHostingPrecedences[k]
if !ok || v != v2 {
log.Info("identity updated, a service hosting precedence changed")
return false
}
}

for k, v := range self.ServiceHostingCosts {
v2, ok := other.ServiceHostingCosts[k]
if !ok || v != v2 {
log.Info("identity updated, a service hosting cost changed")
return false
}
}

return true
}

type DataStateConfigType = edge_ctrl_pb.DataState_ConfigType

type ConfigType struct {
*DataStateConfigType
index uint64
}

func (self *ConfigType) Equals(other *ConfigType) bool {
return self.Name == other.Name
}

type DataStateConfig = edge_ctrl_pb.DataState_Config

type Config struct {
*DataStateConfig
index uint64
}

func (self *Config) Equals(other *Config) bool {
if self.Name != other.Name {
return false
}

if self.TypeId != other.TypeId {
return false
}

if self.DataJson != other.DataJson {
return false
}

return true
}

type DataStateService = edge_ctrl_pb.DataState_Service

type Service struct {
Expand Down Expand Up @@ -239,6 +315,7 @@ func NewReceiverRouterDataModelFromExisting(existing *RouterDataModel, listenerB
events: make(chan subscriberEvent),
closeNotify: closeNotify,
stopNotify: make(chan struct{}),
timelineId: existing.timelineId,
}
currentIndex, _ := existing.CurrentIndex()
result.SetCurrentIndex(currentIndex)
Expand Down Expand Up @@ -908,7 +985,7 @@ func (rdm *RouterDataModel) SubscribeToIdentityChanges(identityId string, subscr
})

if identity != nil {
state := subscription.initialize(rdm, identity)
state, _ := subscription.initialize(rdm, identity)
subscriber.NotifyIdentityEvent(state, EventFullState)
}

Expand Down
88 changes: 77 additions & 11 deletions common/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type IdentityConfig struct {
ConfigType *ConfigType
}

func (self *IdentityConfig) Equals(other *IdentityConfig) bool {
return self.Config.Equals(other.Config) && self.ConfigType.Equals(other.ConfigType)
}

type IdentityService struct {
Service *Service
Checks map[string]struct{}
Expand All @@ -40,41 +44,64 @@ type IdentityService struct {
}

func (self *IdentityService) Equals(other *IdentityService) bool {
if self.Service.index != other.Service.index {
log := pfxlog.Logger().WithField("serviceId", other.Service.Id).WithField("serviceName", other.Service.Name)
if self.Service.Name != other.Service.Name {
log.WithField("field", "name").Info("service updated")
return false
}

if self.Service.EncryptionRequired != other.Service.EncryptionRequired {
log.WithField("field", "encryptionRequired").Info("service updated")
return false
}

if len(self.Service.Configs) != len(other.Service.Configs) {
log.WithField("field", "configs.len").Info("service updated")
return false
}

if len(self.Checks) != len(other.Checks) {
log.WithField("field", "checks.len").Info("service updated")
return false
}

if len(self.Configs) != len(other.Configs) {
log.WithField("field", "identity.configs.len").Info("service updated")
return false
}

if self.DialAllowed != other.DialAllowed {
log.WithField("field", "dialAllowed").Info("service updated")
return false
}

if self.BindAllowed != other.BindAllowed {
log.WithField("field", "bindAllowed").Info("service updated")
return false
}

for id := range self.Checks {
if _, ok := other.Checks[id]; !ok {
log.WithField("field", "checks").Info("service updated")
return false
}
}

for id, config := range self.Configs {
otherConfig, ok := other.Configs[id]
if !ok {
log.WithField("field", "identity.configs").Info("service updated")
return false
}
if config.Config.index != otherConfig.Config.index {
if !config.Equals(otherConfig) {
log.WithField("field", "identity.configs").Info("service updated")
return false
}
if config.ConfigType.index != otherConfig.ConfigType.index {
}

for idx, v := range self.Service.Configs {
if other.Service.Configs[idx] != v {
log.WithField("field", "configs").Info("service updated")
return false
}
}
Expand Down Expand Up @@ -142,8 +169,10 @@ func (self *IdentitySubscription) identityUpdated(identity *Identity) {
self.Lock()
if self.Identity != nil {
if identity.identityIndex > self.Identity.identityIndex {
if !identity.Equals(self.Identity) {
notify = true
}
self.Identity = identity
notify = true
}
present = true
state = self.getState()
Expand Down Expand Up @@ -181,16 +210,19 @@ func (self *IdentitySubscription) identityRemoved() {
}
}

func (self *IdentitySubscription) initialize(rdm *RouterDataModel, identity *Identity) *IdentityState {
func (self *IdentitySubscription) initialize(rdm *RouterDataModel, identity *Identity) (*IdentityState, bool) {
self.Lock()
defer self.Unlock()
wasInitialized := false
if self.Identity == nil {
self.Identity = identity
if self.Services == nil {
self.Services, self.Checks = rdm.buildServiceList(self)
}
} else {
wasInitialized = true
}
return self.getState()
return self.getState(), wasInitialized
}

func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) {
Expand Down Expand Up @@ -244,8 +276,10 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) {
}

if oldIdentity.identityIndex < newIdentity.identityIndex {
for _, subscriber := range self.listeners.Value() {
subscriber.NotifyIdentityEvent(state, EventIdentityUpdated)
if !oldIdentity.Equals(newIdentity) {
for _, subscriber := range self.listeners.Value() {
subscriber.NotifyIdentityEvent(state, EventIdentityUpdated)
}
}
}

Expand Down Expand Up @@ -296,8 +330,36 @@ func (self *IdentitySubscription) checkForChanges(rdm *RouterDataModel) {

type IdentityEventType byte

func (self IdentityEventType) String() string {
switch self {
case EventFullState:
return "identity.full-state"
case EventIdentityUpdated:
return "identity.updated"
case EventPostureChecksUpdated:
return "identity.posture-checks-updated"
case EventIdentityDeleted:
return "identity.deleted"
default:
return "unknown"
}
}

type ServiceEventType byte

func (self ServiceEventType) String() string {
switch self {
case EventAccessGained:
return "access.gained"
case EventUpdated:
return "updated"
case EventAccessRemoved:
return "access.removed"
default:
return "unknown"
}
}

const (
EventAccessGained ServiceEventType = 1
EventUpdated ServiceEventType = 2
Expand Down Expand Up @@ -345,9 +407,13 @@ func (self identityCreatedEvent) process(rdm *RouterDataModel) {
Debug("handling identity created event")

if sub, found := rdm.subscriptions.Get(self.identity.Id); found {
state := sub.initialize(rdm, self.identity)
for _, subscriber := range sub.listeners.Value() {
subscriber.NotifyIdentityEvent(state, EventFullState)
state, wasInitialized := sub.initialize(rdm, self.identity)
if wasInitialized {
sub.checkForChanges(rdm)
} else {
for _, subscriber := range sub.listeners.Value() {
subscriber.NotifyIdentityEvent(state, EventFullState)
}
}
}
}
Expand Down
28 changes: 27 additions & 1 deletion controller/handler_ctrl/remove_terminators.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,33 @@ func (self *removeTerminatorsHandler) HandleReceive(msg *channel.Message, ch cha
func (self *removeTerminatorsHandler) handleRemoveTerminators(msg *channel.Message, ch channel.Channel, request *ctrl_pb.RemoveTerminatorsRequest) {
log := pfxlog.ContextLogger(ch.Label())

if err := self.network.Terminator.DeleteBatch(request.TerminatorIds, self.newChangeContext(ch, "fabric.remove.terminators.batch")); err == nil {
var terminatorIds []string
if self.network.Dispatcher.IsLeader() {
for _, id := range request.TerminatorIds {
isPresent, err := self.network.Terminator.IsEntityPresent(id)
if isPresent || err != nil {
terminatorIds = append(terminatorIds, id)
} else {
log.
WithField("routerId", ch.Id()).
WithField("terminatorId", id).
Info("delete requested of terminator that doesn't exist")
}
}
} else {
terminatorIds = request.TerminatorIds
}

if len(terminatorIds) == 0 {
log.
WithField("routerId", ch.Id()).
WithField("terminatorIds", request.TerminatorIds).
Info("responding to batch terminator delete for non-present terminators")
handler_common.SendSuccess(msg, ch, "")
return
}

if err := self.network.Terminator.DeleteBatch(terminatorIds, self.newChangeContext(ch, "fabric.remove.terminators.batch")); err == nil {
log.
WithField("routerId", ch.Id()).
WithField("terminatorIds", request.TerminatorIds).
Expand Down
Loading

0 comments on commit dee43ba

Please sign in to comment.