diff --git a/api/system/v1alpha1/zone_insight.pb.go b/api/system/v1alpha1/zone_insight.pb.go index f06ced95425a..e3594c22e7ce 100644 --- a/api/system/v1alpha1/zone_insight.pb.go +++ b/api/system/v1alpha1/zone_insight.pb.go @@ -32,8 +32,11 @@ type ZoneInsight struct { // List of KDS subscriptions created by a given Zone Kuma CP. Subscriptions []*KDSSubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"` // Statistics about Envoy Admin Streams + // Deprecated: use kds_streams instead. EnvoyAdminStreams *EnvoyAdminStreams `protobuf:"bytes,2,opt,name=envoy_admin_streams,json=envoyAdminStreams,proto3" json:"envoy_admin_streams,omitempty"` HealthCheck *HealthCheck `protobuf:"bytes,3,opt,name=health_check,json=healthCheck,proto3" json:"health_check,omitempty"` + // Information about kds streams that are estabilished between global and zone + KdsStreams *KDSStreams `protobuf:"bytes,4,opt,name=kds_streams,json=kdsStreams,proto3" json:"kds_streams,omitempty"` } func (x *ZoneInsight) Reset() { @@ -89,6 +92,13 @@ func (x *ZoneInsight) GetHealthCheck() *HealthCheck { return nil } +func (x *ZoneInsight) GetKdsStreams() *KDSStreams { + if x != nil { + return x.KdsStreams + } + return nil +} + type EnvoyAdminStreams struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -155,6 +165,147 @@ func (x *EnvoyAdminStreams) GetClustersGlobalInstanceId() string { return "" } +type KDSStreams struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Details of stream that handles Clusters stream. + Clusters *KDSStream `protobuf:"bytes,1,opt,name=clusters,proto3" json:"clusters,omitempty"` + // Details of stream that handles XDS Config Dump stream. + ConfigDump *KDSStream `protobuf:"bytes,2,opt,name=config_dump,json=configDump,proto3" json:"config_dump,omitempty"` + // Details of stream that handles Stats stream. + Stats *KDSStream `protobuf:"bytes,3,opt,name=stats,proto3" json:"stats,omitempty"` + // Details of stream that handles global to zone resource sync stream. + GlobalToZone *KDSStream `protobuf:"bytes,4,opt,name=global_to_zone,json=globalToZone,proto3" json:"global_to_zone,omitempty"` + // Details of stream that handles zone to global resource sync stream. + ZoneToGlobal *KDSStream `protobuf:"bytes,5,opt,name=zone_to_global,json=zoneToGlobal,proto3" json:"zone_to_global,omitempty"` +} + +func (x *KDSStreams) Reset() { + *x = KDSStreams{} + if protoimpl.UnsafeEnabled { + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KDSStreams) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KDSStreams) ProtoMessage() {} + +func (x *KDSStreams) ProtoReflect() protoreflect.Message { + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KDSStreams.ProtoReflect.Descriptor instead. +func (*KDSStreams) Descriptor() ([]byte, []int) { + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{2} +} + +func (x *KDSStreams) GetClusters() *KDSStream { + if x != nil { + return x.Clusters + } + return nil +} + +func (x *KDSStreams) GetConfigDump() *KDSStream { + if x != nil { + return x.ConfigDump + } + return nil +} + +func (x *KDSStreams) GetStats() *KDSStream { + if x != nil { + return x.Stats + } + return nil +} + +func (x *KDSStreams) GetGlobalToZone() *KDSStream { + if x != nil { + return x.GlobalToZone + } + return nil +} + +func (x *KDSStreams) GetZoneToGlobal() *KDSStream { + if x != nil { + return x.ZoneToGlobal + } + return nil +} + +type KDSStream struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Global instance ID that handles the stream. + GlobalInstanceId string `protobuf:"bytes,1,opt,name=global_instance_id,json=globalInstanceId,proto3" json:"global_instance_id,omitempty"` + // Time when the stream was open. + ConnectTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=connect_time,json=connectTime,proto3" json:"connect_time,omitempty"` +} + +func (x *KDSStream) Reset() { + *x = KDSStream{} + if protoimpl.UnsafeEnabled { + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KDSStream) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KDSStream) ProtoMessage() {} + +func (x *KDSStream) ProtoReflect() protoreflect.Message { + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KDSStream.ProtoReflect.Descriptor instead. +func (*KDSStream) Descriptor() ([]byte, []int) { + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{3} +} + +func (x *KDSStream) GetGlobalInstanceId() string { + if x != nil { + return x.GlobalInstanceId + } + return "" +} + +func (x *KDSStream) GetConnectTime() *timestamppb.Timestamp { + if x != nil { + return x.ConnectTime + } + return nil +} + // KDSSubscription describes a single KDS subscription // created by a Zone to the Global. // Ideally, there should be only one such subscription per Zone lifecycle. @@ -197,7 +348,7 @@ type KDSSubscription struct { func (x *KDSSubscription) Reset() { *x = KDSSubscription{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[2] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -210,7 +361,7 @@ func (x *KDSSubscription) String() string { func (*KDSSubscription) ProtoMessage() {} func (x *KDSSubscription) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[2] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -223,7 +374,7 @@ func (x *KDSSubscription) ProtoReflect() protoreflect.Message { // Deprecated: Use KDSSubscription.ProtoReflect.Descriptor instead. func (*KDSSubscription) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{2} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{4} } func (x *KDSSubscription) GetId() string { @@ -312,7 +463,7 @@ type KDSSubscriptionStatus struct { func (x *KDSSubscriptionStatus) Reset() { *x = KDSSubscriptionStatus{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[3] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -325,7 +476,7 @@ func (x *KDSSubscriptionStatus) String() string { func (*KDSSubscriptionStatus) ProtoMessage() {} func (x *KDSSubscriptionStatus) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[3] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -338,7 +489,7 @@ func (x *KDSSubscriptionStatus) ProtoReflect() protoreflect.Message { // Deprecated: Use KDSSubscriptionStatus.ProtoReflect.Descriptor instead. func (*KDSSubscriptionStatus) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{3} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{5} } func (x *KDSSubscriptionStatus) GetLastUpdateTime() *timestamppb.Timestamp { @@ -379,7 +530,7 @@ type KDSServiceStats struct { func (x *KDSServiceStats) Reset() { *x = KDSServiceStats{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[4] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -392,7 +543,7 @@ func (x *KDSServiceStats) String() string { func (*KDSServiceStats) ProtoMessage() {} func (x *KDSServiceStats) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[4] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -405,7 +556,7 @@ func (x *KDSServiceStats) ProtoReflect() protoreflect.Message { // Deprecated: Use KDSServiceStats.ProtoReflect.Descriptor instead. func (*KDSServiceStats) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{4} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{6} } func (x *KDSServiceStats) GetResponsesSent() uint64 { @@ -442,7 +593,7 @@ type Version struct { func (x *Version) Reset() { *x = Version{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[5] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -455,7 +606,7 @@ func (x *Version) String() string { func (*Version) ProtoMessage() {} func (x *Version) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[5] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -468,7 +619,7 @@ func (x *Version) ProtoReflect() protoreflect.Message { // Deprecated: Use Version.ProtoReflect.Descriptor instead. func (*Version) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{5} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{7} } func (x *Version) GetKumaCp() *KumaCpVersion { @@ -499,7 +650,7 @@ type KumaCpVersion struct { func (x *KumaCpVersion) Reset() { *x = KumaCpVersion{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[6] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -512,7 +663,7 @@ func (x *KumaCpVersion) String() string { func (*KumaCpVersion) ProtoMessage() {} func (x *KumaCpVersion) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[6] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -525,7 +676,7 @@ func (x *KumaCpVersion) ProtoReflect() protoreflect.Message { // Deprecated: Use KumaCpVersion.ProtoReflect.Descriptor instead. func (*KumaCpVersion) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{6} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{8} } func (x *KumaCpVersion) GetVersion() string { @@ -576,7 +727,7 @@ type HealthCheck struct { func (x *HealthCheck) Reset() { *x = HealthCheck{} if protoimpl.UnsafeEnabled { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[7] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -589,7 +740,7 @@ func (x *HealthCheck) String() string { func (*HealthCheck) ProtoMessage() {} func (x *HealthCheck) ProtoReflect() protoreflect.Message { - mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[7] + mi := &file_api_system_v1alpha1_zone_insight_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -602,7 +753,7 @@ func (x *HealthCheck) ProtoReflect() protoreflect.Message { // Deprecated: Use HealthCheck.ProtoReflect.Descriptor instead. func (*HealthCheck) Descriptor() ([]byte, []int) { - return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{7} + return file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP(), []int{9} } func (x *HealthCheck) GetTime() *timestamppb.Timestamp { @@ -624,7 +775,7 @@ var file_api_system_v1alpha1_zone_insight_proto_rawDesc = []byte{ 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2f, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x22, 0x9d, 0x03, 0x0a, 0x0b, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, + 0x22, 0xe0, 0x03, 0x0a, 0x0b, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, 0x12, 0x4b, 0x0a, 0x0d, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, @@ -639,31 +790,64 @@ var file_api_system_v1alpha1_zone_insight_proto_rawDesc = []byte{ 0x5f, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, - 0x0b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x3a, 0xa1, 0x01, 0xaa, - 0x8c, 0x89, 0xa6, 0x01, 0x15, 0x0a, 0x13, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, 0x73, 0x69, 0x67, - 0x68, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x0d, - 0x12, 0x0b, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x8c, 0x89, - 0xa6, 0x01, 0x08, 0x22, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0xaa, 0x8c, 0x89, 0xa6, 0x01, - 0x02, 0x18, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x10, 0x3a, 0x0e, 0x0a, 0x0c, 0x7a, 0x6f, 0x6e, - 0x65, 0x2d, 0x69, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x04, 0x3a, - 0x02, 0x18, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x37, 0x52, 0x35, 0x6d, 0x6f, 0x64, 0x65, 0x6c, - 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x64, 0x42, 0x79, 0x47, 0x6c, 0x6f, 0x62, 0x61, - 0x6c, 0x46, 0x6c, 0x61, 0x67, 0x20, 0x7c, 0x20, 0x6d, 0x6f, 0x64, 0x65, 0x6c, 0x2e, 0x50, 0x72, - 0x6f, 0x76, 0x69, 0x64, 0x65, 0x64, 0x42, 0x79, 0x5a, 0x6f, 0x6e, 0x65, 0x46, 0x6c, 0x61, 0x67, - 0x22, 0xcf, 0x01, 0x0a, 0x11, 0x45, 0x6e, 0x76, 0x6f, 0x79, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x42, 0x0a, 0x1e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x5f, 0x64, 0x75, 0x6d, 0x70, 0x5f, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, 0x73, - 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x1a, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x75, 0x6d, 0x70, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, - 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x37, 0x0a, 0x18, 0x73, 0x74, - 0x61, 0x74, 0x73, 0x5f, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, - 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x73, 0x74, - 0x61, 0x74, 0x73, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, - 0x65, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x1b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x5f, - 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, - 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x73, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, - 0x49, 0x64, 0x22, 0x89, 0x04, 0x0a, 0x0f, 0x4b, 0x44, 0x53, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x0b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x41, 0x0a, 0x0b, + 0x6b, 0x64, 0x73, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x20, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x44, 0x53, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x73, 0x52, 0x0a, 0x6b, 0x64, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x3a, + 0xa1, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x15, 0x0a, 0x13, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, + 0x73, 0x69, 0x67, 0x68, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0xaa, 0x8c, 0x89, + 0xa6, 0x01, 0x0d, 0x12, 0x0b, 0x5a, 0x6f, 0x6e, 0x65, 0x49, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, + 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x08, 0x22, 0x06, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0xaa, 0x8c, + 0x89, 0xa6, 0x01, 0x02, 0x18, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x10, 0x3a, 0x0e, 0x0a, 0x0c, + 0x7a, 0x6f, 0x6e, 0x65, 0x2d, 0x69, 0x6e, 0x73, 0x69, 0x67, 0x68, 0x74, 0xaa, 0x8c, 0x89, 0xa6, + 0x01, 0x04, 0x3a, 0x02, 0x18, 0x01, 0xaa, 0x8c, 0x89, 0xa6, 0x01, 0x37, 0x52, 0x35, 0x6d, 0x6f, + 0x64, 0x65, 0x6c, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x64, 0x42, 0x79, 0x47, 0x6c, + 0x6f, 0x62, 0x61, 0x6c, 0x46, 0x6c, 0x61, 0x67, 0x20, 0x7c, 0x20, 0x6d, 0x6f, 0x64, 0x65, 0x6c, + 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x64, 0x42, 0x79, 0x5a, 0x6f, 0x6e, 0x65, 0x46, + 0x6c, 0x61, 0x67, 0x22, 0xcf, 0x01, 0x0a, 0x11, 0x45, 0x6e, 0x76, 0x6f, 0x79, 0x41, 0x64, 0x6d, + 0x69, 0x6e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x42, 0x0a, 0x1e, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x5f, 0x64, 0x75, 0x6d, 0x70, 0x5f, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, + 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x1a, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x75, 0x6d, 0x70, 0x47, 0x6c, 0x6f, + 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x37, 0x0a, + 0x18, 0x73, 0x74, 0x61, 0x74, 0x73, 0x5f, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, + 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x15, 0x73, 0x74, 0x61, 0x74, 0x73, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, + 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x1b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x73, 0x5f, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, + 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x18, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x73, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, 0x61, + 0x6e, 0x63, 0x65, 0x49, 0x64, 0x22, 0xd0, 0x02, 0x0a, 0x0a, 0x4b, 0x44, 0x53, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x73, 0x12, 0x3b, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x44, + 0x53, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x73, 0x12, 0x40, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x64, 0x75, 0x6d, 0x70, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, + 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x44, + 0x53, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, + 0x75, 0x6d, 0x70, 0x12, 0x35, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x44, 0x53, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x12, 0x45, 0x0a, 0x0e, 0x67, 0x6c, + 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x74, 0x6f, 0x5f, 0x7a, 0x6f, 0x6e, 0x65, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x44, 0x53, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x52, 0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, 0x5a, 0x6f, 0x6e, + 0x65, 0x12, 0x45, 0x0a, 0x0e, 0x7a, 0x6f, 0x6e, 0x65, 0x5f, 0x74, 0x6f, 0x5f, 0x67, 0x6c, 0x6f, + 0x62, 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6b, 0x75, 0x6d, 0x61, + 0x2e, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x2e, 0x4b, 0x44, 0x53, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x0c, 0x7a, 0x6f, 0x6e, 0x65, + 0x54, 0x6f, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x22, 0x78, 0x0a, 0x09, 0x4b, 0x44, 0x53, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, + 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x10, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, + 0x65, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x69, + 0x6d, 0x65, 0x22, 0x89, 0x04, 0x0a, 0x0f, 0x4b, 0x44, 0x53, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x17, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x10, 0x01, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, 0x12, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, @@ -764,38 +948,47 @@ func file_api_system_v1alpha1_zone_insight_proto_rawDescGZIP() []byte { return file_api_system_v1alpha1_zone_insight_proto_rawDescData } -var file_api_system_v1alpha1_zone_insight_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_api_system_v1alpha1_zone_insight_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_api_system_v1alpha1_zone_insight_proto_goTypes = []interface{}{ (*ZoneInsight)(nil), // 0: kuma.system.v1alpha1.ZoneInsight (*EnvoyAdminStreams)(nil), // 1: kuma.system.v1alpha1.EnvoyAdminStreams - (*KDSSubscription)(nil), // 2: kuma.system.v1alpha1.KDSSubscription - (*KDSSubscriptionStatus)(nil), // 3: kuma.system.v1alpha1.KDSSubscriptionStatus - (*KDSServiceStats)(nil), // 4: kuma.system.v1alpha1.KDSServiceStats - (*Version)(nil), // 5: kuma.system.v1alpha1.Version - (*KumaCpVersion)(nil), // 6: kuma.system.v1alpha1.KumaCpVersion - (*HealthCheck)(nil), // 7: kuma.system.v1alpha1.HealthCheck - nil, // 8: kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry - (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*KDSStreams)(nil), // 2: kuma.system.v1alpha1.KDSStreams + (*KDSStream)(nil), // 3: kuma.system.v1alpha1.KDSStream + (*KDSSubscription)(nil), // 4: kuma.system.v1alpha1.KDSSubscription + (*KDSSubscriptionStatus)(nil), // 5: kuma.system.v1alpha1.KDSSubscriptionStatus + (*KDSServiceStats)(nil), // 6: kuma.system.v1alpha1.KDSServiceStats + (*Version)(nil), // 7: kuma.system.v1alpha1.Version + (*KumaCpVersion)(nil), // 8: kuma.system.v1alpha1.KumaCpVersion + (*HealthCheck)(nil), // 9: kuma.system.v1alpha1.HealthCheck + nil, // 10: kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry + (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp } var file_api_system_v1alpha1_zone_insight_proto_depIdxs = []int32{ - 2, // 0: kuma.system.v1alpha1.ZoneInsight.subscriptions:type_name -> kuma.system.v1alpha1.KDSSubscription + 4, // 0: kuma.system.v1alpha1.ZoneInsight.subscriptions:type_name -> kuma.system.v1alpha1.KDSSubscription 1, // 1: kuma.system.v1alpha1.ZoneInsight.envoy_admin_streams:type_name -> kuma.system.v1alpha1.EnvoyAdminStreams - 7, // 2: kuma.system.v1alpha1.ZoneInsight.health_check:type_name -> kuma.system.v1alpha1.HealthCheck - 9, // 3: kuma.system.v1alpha1.KDSSubscription.connect_time:type_name -> google.protobuf.Timestamp - 9, // 4: kuma.system.v1alpha1.KDSSubscription.disconnect_time:type_name -> google.protobuf.Timestamp - 3, // 5: kuma.system.v1alpha1.KDSSubscription.status:type_name -> kuma.system.v1alpha1.KDSSubscriptionStatus - 5, // 6: kuma.system.v1alpha1.KDSSubscription.version:type_name -> kuma.system.v1alpha1.Version - 9, // 7: kuma.system.v1alpha1.KDSSubscriptionStatus.last_update_time:type_name -> google.protobuf.Timestamp - 4, // 8: kuma.system.v1alpha1.KDSSubscriptionStatus.total:type_name -> kuma.system.v1alpha1.KDSServiceStats - 8, // 9: kuma.system.v1alpha1.KDSSubscriptionStatus.stat:type_name -> kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry - 6, // 10: kuma.system.v1alpha1.Version.kumaCp:type_name -> kuma.system.v1alpha1.KumaCpVersion - 9, // 11: kuma.system.v1alpha1.HealthCheck.time:type_name -> google.protobuf.Timestamp - 4, // 12: kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry.value:type_name -> kuma.system.v1alpha1.KDSServiceStats - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 9, // 2: kuma.system.v1alpha1.ZoneInsight.health_check:type_name -> kuma.system.v1alpha1.HealthCheck + 2, // 3: kuma.system.v1alpha1.ZoneInsight.kds_streams:type_name -> kuma.system.v1alpha1.KDSStreams + 3, // 4: kuma.system.v1alpha1.KDSStreams.clusters:type_name -> kuma.system.v1alpha1.KDSStream + 3, // 5: kuma.system.v1alpha1.KDSStreams.config_dump:type_name -> kuma.system.v1alpha1.KDSStream + 3, // 6: kuma.system.v1alpha1.KDSStreams.stats:type_name -> kuma.system.v1alpha1.KDSStream + 3, // 7: kuma.system.v1alpha1.KDSStreams.global_to_zone:type_name -> kuma.system.v1alpha1.KDSStream + 3, // 8: kuma.system.v1alpha1.KDSStreams.zone_to_global:type_name -> kuma.system.v1alpha1.KDSStream + 11, // 9: kuma.system.v1alpha1.KDSStream.connect_time:type_name -> google.protobuf.Timestamp + 11, // 10: kuma.system.v1alpha1.KDSSubscription.connect_time:type_name -> google.protobuf.Timestamp + 11, // 11: kuma.system.v1alpha1.KDSSubscription.disconnect_time:type_name -> google.protobuf.Timestamp + 5, // 12: kuma.system.v1alpha1.KDSSubscription.status:type_name -> kuma.system.v1alpha1.KDSSubscriptionStatus + 7, // 13: kuma.system.v1alpha1.KDSSubscription.version:type_name -> kuma.system.v1alpha1.Version + 11, // 14: kuma.system.v1alpha1.KDSSubscriptionStatus.last_update_time:type_name -> google.protobuf.Timestamp + 6, // 15: kuma.system.v1alpha1.KDSSubscriptionStatus.total:type_name -> kuma.system.v1alpha1.KDSServiceStats + 10, // 16: kuma.system.v1alpha1.KDSSubscriptionStatus.stat:type_name -> kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry + 8, // 17: kuma.system.v1alpha1.Version.kumaCp:type_name -> kuma.system.v1alpha1.KumaCpVersion + 11, // 18: kuma.system.v1alpha1.HealthCheck.time:type_name -> google.protobuf.Timestamp + 6, // 19: kuma.system.v1alpha1.KDSSubscriptionStatus.StatEntry.value:type_name -> kuma.system.v1alpha1.KDSServiceStats + 20, // [20:20] is the sub-list for method output_type + 20, // [20:20] is the sub-list for method input_type + 20, // [20:20] is the sub-list for extension type_name + 20, // [20:20] is the sub-list for extension extendee + 0, // [0:20] is the sub-list for field type_name } func init() { file_api_system_v1alpha1_zone_insight_proto_init() } @@ -829,7 +1022,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*KDSSubscription); i { + switch v := v.(*KDSStreams); i { case 0: return &v.state case 1: @@ -841,7 +1034,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*KDSSubscriptionStatus); i { + switch v := v.(*KDSStream); i { case 0: return &v.state case 1: @@ -853,7 +1046,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*KDSServiceStats); i { + switch v := v.(*KDSSubscription); i { case 0: return &v.state case 1: @@ -865,7 +1058,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Version); i { + switch v := v.(*KDSSubscriptionStatus); i { case 0: return &v.state case 1: @@ -877,7 +1070,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*KumaCpVersion); i { + switch v := v.(*KDSServiceStats); i { case 0: return &v.state case 1: @@ -889,6 +1082,30 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { } } file_api_system_v1alpha1_zone_insight_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Version); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_system_v1alpha1_zone_insight_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*KumaCpVersion); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_system_v1alpha1_zone_insight_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*HealthCheck); i { case 0: return &v.state @@ -907,7 +1124,7 @@ func file_api_system_v1alpha1_zone_insight_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_system_v1alpha1_zone_insight_proto_rawDesc, NumEnums: 0, - NumMessages: 9, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/api/system/v1alpha1/zone_insight.proto b/api/system/v1alpha1/zone_insight.proto index 0dd4b0c4b95a..469aea5520c1 100644 --- a/api/system/v1alpha1/zone_insight.proto +++ b/api/system/v1alpha1/zone_insight.proto @@ -25,9 +25,12 @@ message ZoneInsight { repeated KDSSubscription subscriptions = 1; // Statistics about Envoy Admin Streams + // Deprecated: use kds_streams instead. EnvoyAdminStreams envoy_admin_streams = 2; HealthCheck health_check = 3; + // Information about kds streams that are estabilished between global and zone + KDSStreams kds_streams = 4; } message EnvoyAdminStreams { @@ -39,6 +42,26 @@ message EnvoyAdminStreams { string clusters_global_instance_id = 3; } +message KDSStreams { + // Details of stream that handles Clusters stream. + KDSStream clusters = 1; + // Details of stream that handles XDS Config Dump stream. + KDSStream config_dump = 2; + // Details of stream that handles Stats stream. + KDSStream stats = 3; + // Details of stream that handles global to zone resource sync stream. + KDSStream global_to_zone = 4; + // Details of stream that handles zone to global resource sync stream. + KDSStream zone_to_global = 5; +} + +message KDSStream { + // Global instance ID that handles the stream. + string global_instance_id = 1; + // Time when the stream was open. + google.protobuf.Timestamp connect_time = 2; +} + // KDSSubscription describes a single KDS subscription // created by a Zone to the Global. // Ideally, there should be only one such subscription per Zone lifecycle. diff --git a/api/system/v1alpha1/zone_insight_helpers.go b/api/system/v1alpha1/zone_insight_helpers.go index 62769724e472..34917e0444bc 100644 --- a/api/system/v1alpha1/zone_insight_helpers.go +++ b/api/system/v1alpha1/zone_insight_helpers.go @@ -40,6 +40,22 @@ func (x *ZoneInsight) IsOnline() bool { return false } +func (x *ZoneInsight) GetKDSStream(streamType string) *KDSStream { + switch streamType { + case "globalToZone": + return x.GetKdsStreams().GetGlobalToZone() + case "zoneToGlobal": + return x.GetKdsStreams().GetZoneToGlobal() + case "clusters": + return x.GetKdsStreams().GetClusters() + case "stats": + return x.GetKdsStreams().GetStats() + case "configDump": + return x.GetKdsStreams().GetConfigDump() + } + return nil +} + func (x *ZoneInsight) AllSubscriptions() []generic.Subscription { return generic.AllSubscriptions[*KDSSubscription](x) } diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 3bf181576b8a..411b09ab06df 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -267,6 +267,7 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second)) Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second)) Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second)) + Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.CloseStaleConn).To(BeTrue()) Expect(cfg.Multizone.Global.KDS.Tracing.Enabled).To(BeFalse()) Expect(cfg.Multizone.Global.KDS.Labels.SkipPrefixes).To(Equal([]string{"argocd.argoproj.io"})) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) @@ -624,6 +625,7 @@ multizone: zoneHealthCheck: pollInterval: 11s timeout: 110s + closeStaleConn: true tracing: enabled: false labels: @@ -974,6 +976,7 @@ meshService: "KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s", "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s", "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s", + "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_CLOSE_STALE_CONN": "true", "KUMA_MULTIZONE_GLOBAL_KDS_TRACING_ENABLED": "false", "KUMA_MULTIZONE_GLOBAL_KDS_LABELS_SKIP_PREFIXES": "argocd.argoproj.io", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index 6ef756ea397d..61a232d8a281 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -122,6 +122,9 @@ type ZoneHealthCheckConfig struct { // Timeout is the time after the last health check that a zone counts as // no longer online Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"` + // CloseStaleConn determines whether to disconnect stale connections from the same zone and tenant + // that were not properly closed. + CloseStaleConn bool `json:"closeStaleConn" envconfig:"kuma_multizone_global_kds_zone_health_check_close_stale_conn"` } func (c ZoneHealthCheckConfig) Validate() error { diff --git a/pkg/intercp/envoyadmin/forwarding_kds_client.go b/pkg/intercp/envoyadmin/forwarding_kds_client.go index 7ee1188f0116..f81a494f91e1 100644 --- a/pkg/intercp/envoyadmin/forwarding_kds_client.go +++ b/pkg/intercp/envoyadmin/forwarding_kds_client.go @@ -168,15 +168,27 @@ func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zo if !zoneInsightRes.Spec.IsOnline() { return "", &ZoneOfflineError{rpcName: rpcName} } - streams := zoneInsightRes.Spec.GetEnvoyAdminStreams() + streams := zoneInsightRes.Spec.GetKdsStreams() var globalInstanceID string switch rpcName { case service.ConfigDumpRPC: - globalInstanceID = streams.GetConfigDumpGlobalInstanceId() + if streams.GetConfigDump() != nil { + globalInstanceID = streams.GetConfigDump().GetGlobalInstanceId() + } else { + globalInstanceID = zoneInsightRes.Spec.GetEnvoyAdminStreams().GetConfigDumpGlobalInstanceId() + } case service.StatsRPC: - globalInstanceID = streams.GetStatsGlobalInstanceId() + if streams.GetStats() != nil { + globalInstanceID = streams.GetStats().GetGlobalInstanceId() + } else { + globalInstanceID = zoneInsightRes.Spec.GetEnvoyAdminStreams().GetStatsGlobalInstanceId() + } case service.ClustersRPC: - globalInstanceID = streams.GetClustersGlobalInstanceId() + if streams.GetClusters() != nil { + globalInstanceID = streams.GetClusters().GetGlobalInstanceId() + } else { + globalInstanceID = zoneInsightRes.Spec.GetEnvoyAdminStreams().GetClustersGlobalInstanceId() + } default: return "", errors.Errorf("invalid operation %s", rpcName) } diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index d5d0041edda8..81846edf3416 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -153,6 +153,9 @@ func Setup(rt runtime.Runtime) error { rt.KDSContext().GlobalServerFiltersV2, rt.Extensions(), rt.EventBus(), + rt.ResourceManager(), + rt.Config().Store.Upsert, + rt.GetInstanceId(), ), ), rt.Config().General.ResilientComponentBaseBackoff.Duration, diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 0023cad5d279..dc63a2163aa9 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -15,7 +15,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -68,11 +67,11 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { grpc.UseCompressor(gzip.Name), grpc.MaxCallSendMsgSize(int(c.config.MaxMsgSize)), grpc.MaxCallRecvMsgSize(int(c.config.MaxMsgSize))), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: grpcKeepAliveTime, - Timeout: grpcKeepAliveTime, - PermitWithoutStream: true, - }), + // grpc.WithKeepaliveParams(keepalive.ClientParameters{ + // Time: grpcKeepAliveTime, + // Timeout: grpcKeepAliveTime, + // PermitWithoutStream: true, + // }), ) switch u.Scheme { case "grpc": diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index 87f8066552ba..bfa5fce0be80 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -2,20 +2,30 @@ package mux import ( "context" + "math/rand" + "time" "github.com/pkg/errors" + "github.com/sethvargo/go-retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" + "github.com/kumahq/kuma/api/system/v1alpha1" + config_store "github.com/kumahq/kuma/pkg/config/core/resources/store" "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + core_store "github.com/kumahq/kuma/pkg/core/resources/store" "github.com/kumahq/kuma/pkg/events" "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/kds/util" "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" + "github.com/kumahq/kuma/pkg/util/proto" ) type FilterV2 interface { @@ -44,10 +54,13 @@ type KDSSyncServiceServer struct { extensions context.Context eventBus events.EventBus mesh_proto.UnimplementedKDSSyncServiceServer - context context.Context + context context.Context + resManager manager.ResourceManager + upsertCfg config_store.UpsertConfig + instanceID string } -func NewKDSSyncServiceServer(ctx context.Context, globalToZoneCb OnGlobalToZoneSyncConnectFunc, zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2, extensions context.Context, eventBus events.EventBus) *KDSSyncServiceServer { +func NewKDSSyncServiceServer(ctx context.Context, globalToZoneCb OnGlobalToZoneSyncConnectFunc, zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2, extensions context.Context, eventBus events.EventBus, resManager manager.ResourceManager, upsertCfg config_store.UpsertConfig, instanceID string) *KDSSyncServiceServer { return &KDSSyncServiceServer{ context: ctx, globalToZoneCb: globalToZoneCb, @@ -55,6 +68,9 @@ func NewKDSSyncServiceServer(ctx context.Context, globalToZoneCb OnGlobalToZoneS filters: filters, extensions: extensions, eventBus: eventBus, + resManager: resManager, + upsertCfg: upsertCfg, + instanceID: instanceID, } } @@ -66,14 +82,15 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService if err != nil { return err } - logger = logger.WithValues("clientID", zone) + logger = logger.WithValues("clientID", zone, "type", "globalToZone") for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } - shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) + connectTime := time.Now() + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone, service.GlobalToZone, connectTime) defer shouldDisconnectStream.Close() processingErrorsCh := make(chan error, 1) @@ -82,6 +99,14 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService processingErrorsCh <- err } }() + if err := g.storeStreamConnection(stream.Context(), zone, service.GlobalToZone, connectTime); err != nil { + if errors.Is(err, context.Canceled) && errors.Is(stream.Context().Err(), context.Canceled) { + return status.Error(codes.Canceled, "stream was cancelled") + } + logger.Error(err, "could not store stream connection") + return status.Error(codes.Internal, "could not store stream connection") + } + logger.Info("stored stream connection") select { case <-shouldDisconnectStream.Recv(): @@ -108,14 +133,14 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService if err != nil { return err } - logger = logger.WithValues("clientID", zone) + logger = logger.WithValues("clientID", zone, "type", "zoneToGlobal") for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } - - shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) + connectTime := time.Now() + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone, service.ZoneToGlobal, connectTime) defer shouldDisconnectStream.Close() processingErrorsCh := make(chan error, 1) @@ -125,6 +150,15 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService } }() + if err := g.storeStreamConnection(stream.Context(), zone, service.ZoneToGlobal, connectTime); err != nil { + if errors.Is(err, context.Canceled) && errors.Is(stream.Context().Err(), context.Canceled) { + return status.Error(codes.Canceled, "stream was cancelled") + } + logger.Error(err, "could not store stream connection") + return status.Error(codes.Internal, "could not store stream connection") + } + logger.Info("stored stream connection") + select { case <-shouldDisconnectStream.Recv(): logger.Info("ending stream, zone health check failed") @@ -144,18 +178,68 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService } } -func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener { +func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string, typ service.StreamType, connectTime time.Time) events.Listener { tenantID, _ := multitenant.TenantFromCtx(streamContext) shouldDisconnectStream := events.NewNeverListener() - if kds.ContextHasFeature(streamContext, kds.FeatureZonePingHealth) { shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(service.ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + switch event := e.(type) { + case service.ZoneWentOffline: + return event.TenantID == tenantID && event.Zone == zone + case service.StreamCancelled: + return event.TenantID == tenantID && event.Zone == zone && event.Type == typ && event.ConnTime == connectTime + default: + return false + } }) - g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID, Type: typ, ConnTime: connectTime}) } return shouldDisconnectStream } + +func (g *KDSSyncServiceServer) storeStreamConnection(ctx context.Context, zone string, typ service.StreamType, connectTime time.Time) error { + key := model.ResourceKey{Name: zone} + + // wait for Zone to be created, only then we can create Zone Insight + err := retry.Do( + ctx, + retry.WithMaxRetries(30, retry.NewConstant(1*time.Second)), + func(ctx context.Context) error { + return retry.RetryableError(g.resManager.Get(ctx, system.NewZoneResource(), core_store.GetBy(key))) + }, + ) + if err != nil { + return err + } + + // Add delay for Upsert. If Global CP is behind an HTTP load balancer, + // it might be the case that each Envoy Admin stream will land on separate instance. + // In this case, all instances will try to update Zone Insight which will result in conflicts. + // Since it's unusual to immediately execute envoy admin rpcs after zone is connected, 0-10s delay should be fine. + // #nosec G404 - math rand is enough + time.Sleep(time.Duration(rand.Int31n(10000)) * time.Millisecond) + + zoneInsight := system.NewZoneInsightResource() + return manager.Upsert(ctx, g.resManager, key, zoneInsight, func(resource model.Resource) error { + if zoneInsight.Spec.KdsStreams == nil { + zoneInsight.Spec.KdsStreams = &v1alpha1.KDSStreams{} + } + stream := zoneInsight.Spec.GetKDSStream(string(typ)) + if stream == nil { + stream = &v1alpha1.KDSStream{} + } + if stream.GetConnectTime() == nil || proto.MustTimestampFromProto(stream.ConnectTime).Before(connectTime) { + stream.GlobalInstanceId = g.instanceID + stream.ConnectTime = proto.MustTimestampProto(connectTime) + } + switch typ { + case service.GlobalToZone: + zoneInsight.Spec.KdsStreams.GlobalToZone = stream + case service.ZoneToGlobal: + zoneInsight.Spec.KdsStreams.ZoneToGlobal = stream + } + return nil + }, manager.WithConflictRetry(g.upsertCfg.ConflictRetryBaseBackoff.Duration, g.upsertCfg.ConflictRetryMaxTimes, g.upsertCfg.ConflictRetryJitterPercent)) // we need retry because zone sink or other RPC may also update the insight. +} diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go index 36b65093798c..bac053375d63 100644 --- a/pkg/kds/mux/zone_watch.go +++ b/pkg/kds/mux/zone_watch.go @@ -7,6 +7,7 @@ import ( "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" "github.com/kumahq/kuma/pkg/config/multizone" "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/apis/system" @@ -18,6 +19,7 @@ import ( kuma_log "github.com/kumahq/kuma/pkg/log" core_metrics "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/multitenant" + "github.com/kumahq/kuma/pkg/util/proto" ) type zoneTenant struct { @@ -26,14 +28,16 @@ type zoneTenant struct { } type ZoneWatch struct { - log logr.Logger - poll time.Duration - timeout time.Duration - bus events.EventBus - extensions context.Context - rm manager.ReadOnlyResourceManager - summary prometheus.Summary - zones map[zoneTenant]time.Time + log logr.Logger + poll time.Duration + timeout time.Duration + bus events.EventBus + extensions context.Context + rm manager.ReadOnlyResourceManager + summary prometheus.Summary + zones map[zoneTenant]time.Time + zoneStreams map[zoneTenant]map[service.StreamType]time.Time + closeStaleConn bool } func NewZoneWatch( @@ -54,14 +58,16 @@ func NewZoneWatch( } return &ZoneWatch{ - log: log, - poll: cfg.PollInterval.Duration, - timeout: cfg.Timeout.Duration, - bus: bus, - extensions: extensions, - rm: rm, - summary: summary, - zones: map[zoneTenant]time.Time{}, + log: log, + poll: cfg.PollInterval.Duration, + timeout: cfg.Timeout.Duration, + bus: bus, + extensions: extensions, + rm: rm, + summary: summary, + zones: map[zoneTenant]time.Time{}, + zoneStreams: map[zoneTenant]map[service.StreamType]time.Time{}, + closeStaleConn: cfg.CloseStaleConn, }, nil } @@ -91,6 +97,7 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { TenantID: zone.tenantID, }) delete(zw.zones, zone) + delete(zw.zoneStreams, zone) } else { log.Info("error getting ZoneInsight", "zone", zone.zone, "error", err) } @@ -110,11 +117,23 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { TenantID: zone.tenantID, }) delete(zw.zones, zone) + delete(zw.zoneStreams, zone) + } + // Now we want to check individual stream + if zw.closeStaleConn { + zw.cleanupStaleConnections(zone, zoneInsight) } } zw.summary.Observe(float64(core.Now().Sub(start).Milliseconds())) case e := <-connectionWatch.Recv(): newStream := e.(service.ZoneOpenedStream) + // Disconnect the old stream. + // There should not be two streams for the same zone, as only the leader can connect to the global control plane. + // Instead, we generate a unique StreamID for each stream. If we detect a second control plane + // connecting to the same zone with a different StreamID, we cancel the previous stream. + if zw.closeStaleConn { + zw.closeStaleConnectionOnConnect(newStream) + } // We keep a record of the time we open a stream. // This is to prevent the zone from timing out on a poll @@ -136,6 +155,66 @@ func (zw *ZoneWatch) Start(stop <-chan struct{}) error { } } +func (zw *ZoneWatch) closeStaleConnectionOnConnect(newStream service.ZoneOpenedStream) { + streams, found := zw.zoneStreams[zoneTenant{tenantID: newStream.TenantID, zone: newStream.Zone}] + if found { + if prevConnTime, exist := streams[newStream.Type]; exist && prevConnTime.Before(newStream.ConnTime) { + ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID) + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) + log.Info("the same zone has connected but the previous connection wasn't closed, closing", + "zone", newStream.Zone, "streamType", newStream.Type, "previouslyConnected", prevConnTime, "currentlyConnected", newStream.ConnTime) + zw.bus.Send(service.StreamCancelled{ + Zone: newStream.Zone, + TenantID: newStream.TenantID, + Type: newStream.Type, + ConnTime: prevConnTime, + }) + zw.zoneStreams[zoneTenant{tenantID: newStream.TenantID, zone: newStream.Zone}][newStream.Type] = newStream.ConnTime + } + } else { + zw.zoneStreams[zoneTenant{tenantID: newStream.TenantID, zone: newStream.Zone}] = map[service.StreamType]time.Time{ + newStream.Type: newStream.ConnTime, + } + } +} + +func (zw *ZoneWatch) cleanupStaleConnections(zone zoneTenant, zoneInsight *system.ZoneInsightResource) { + for stream, connOpenTime := range zw.zoneStreams[zone] { + var conf *system_proto.KDSStream + switch stream { + case service.Clusters: + conf = zoneInsight.Spec.GetKdsStreams().GetClusters() + case service.ConfigDump: + conf = zoneInsight.Spec.GetKdsStreams().GetConfigDump() + case service.Stats: + conf = zoneInsight.Spec.GetKdsStreams().GetStats() + case service.GlobalToZone: + conf = zoneInsight.Spec.GetKdsStreams().GetGlobalToZone() + case service.ZoneToGlobal: + conf = zoneInsight.Spec.GetKdsStreams().GetZoneToGlobal() + } + if conf == nil { + continue + } + // If we have a connection that started before the one from insight, cancel the stream. + // There's no need to check globalId since the connection exists in the map, meaning it is local. + activeStreamConnTime := proto.MustTimestampFromProto(conf.GetConnectTime()) + if connOpenTime.Before(*activeStreamConnTime) { + ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) + log.Info("the same zone has connected but the previous connection wasn't closed, closing", + "zone", zone.zone, "streamType", stream, "previouslyConnected", connOpenTime, "currentlyConnected", activeStreamConnTime) + zw.bus.Send(service.StreamCancelled{ + Zone: zone.zone, + TenantID: zone.tenantID, + Type: stream, + ConnTime: connOpenTime, + }) + delete(zw.zoneStreams, zone) + } + } +} + func (zw *ZoneWatch) NeedLeaderElection() bool { return false } diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go index e56f81727cb7..816933874f68 100644 --- a/pkg/kds/mux/zone_watch_test.go +++ b/pkg/kds/mux/zone_watch_test.go @@ -62,8 +62,9 @@ var _ = Describe("ZoneWatch", func() { Expect(err).NotTo(HaveOccurred()) cfg := multizone.ZoneHealthCheckConfig{ - PollInterval: types.Duration{Duration: pollInterval}, - Timeout: types.Duration{Duration: timeout}, + PollInterval: types.Duration{Duration: pollInterval}, + Timeout: types.Duration{Duration: timeout}, + CloseStaleConn: true, } rm = manager.NewResourceManager(memory.NewStore()) @@ -98,8 +99,14 @@ var _ = Describe("ZoneWatch", func() { stop = make(chan struct{}) timeouts = eventBus.Subscribe(func(event events.Event) bool { - _, ok := event.(service.ZoneWentOffline) - return ok + switch event.(type) { + case service.ZoneWentOffline: + return true + case service.StreamCancelled: + return true + default: + return false + } }) errCh = make(chan error, 1) @@ -224,4 +231,106 @@ var _ = Describe("ZoneWatch", func() { Zone: zone, }))) }) + It("should disconnect current stream when the same zone connects", func() { + zoneInsight := system.NewZoneInsightResource() + Expect(rm.Get( + context.Background(), + zoneInsight, + store.GetByKey(zone, core_model.NoMesh), + )).To(Succeed()) + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + Expect(rm.Update( + context.Background(), + zoneInsight, + )).To(Succeed()) + + firstConnectTime := time.Now() + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + Type: service.GlobalToZone, + ConnTime: firstConnectTime, + }) + + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(1 * pollInterval) + + Expect(timeouts.Recv()).NotTo(Receive()) + + // try to connect the same zone but on the 2nd stream + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + Type: service.GlobalToZone, + ConnTime: time.Now(), + }) + time.Sleep(1 * pollInterval) + + Eventually(timeouts.Recv(), 2*pollInterval).Should(Receive(Equal(service.StreamCancelled{ + TenantID: "", + Zone: zone, + Type: service.GlobalToZone, + ConnTime: firstConnectTime, + }))) + }) + It("should disconnect current stream when newer connection exists", func() { + stopPing := make(chan struct{}) + oldConnection := time.Now() + zoneInsight := system.NewZoneInsightResource() + Expect(rm.Get( + context.Background(), + zoneInsight, + store.GetByKey(zone, core_model.NoMesh), + )).To(Succeed()) + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + zoneInsight.Spec.KdsStreams = &system_proto.KDSStreams{ + GlobalToZone: &system_proto.KDSStream{ + GlobalInstanceId: "1", + ConnectTime: timestamppb.New(time.Now()), + }, + } + Expect(rm.Update( + context.Background(), + zoneInsight, + )).To(Succeed()) + + // Start a Goroutine for periodic health check pings + go func() { + for { + select { + case <-stopPing: + return + default: + sendHealthCheckPing(rm, zone) + time.Sleep(50 * time.Millisecond) + } + } + }() + + // create a new connection which has time older than previous connection + + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + Type: service.GlobalToZone, + ConnTime: oldConnection, + }) + + // expect to cancel previous stream + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.StreamCancelled{ + TenantID: "", + Zone: zone, + Type: service.GlobalToZone, + ConnTime: oldConnection, + }))) + + close(stopPing) + }) }) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 8f37fa153726..5ab88d51ae21 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -18,7 +18,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" - "github.com/kumahq/kuma/api/system/v1alpha1" system_proto "github.com/kumahq/kuma/api/system/v1alpha1" config_store "github.com/kumahq/kuma/pkg/config/core/resources/store" "github.com/kumahq/kuma/pkg/core" @@ -32,6 +31,7 @@ import ( kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" util_grpc "github.com/kumahq/kuma/pkg/util/grpc" + "github.com/kumahq/kuma/pkg/util/proto" ) var log = core.Log.WithName("kds-service") @@ -115,13 +115,32 @@ func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto. }, nil } +type StreamType string + +var ( + Clusters StreamType = "clusters" + ConfigDump StreamType = "configDump" + Stats StreamType = "stats" + GlobalToZone StreamType = "globalToZone" + ZoneToGlobal StreamType = "zoneToGlobal" +) + type ZoneWentOffline struct { TenantID string Zone string + Type StreamType +} +type StreamCancelled struct { + TenantID string + Zone string + Type StreamType + ConnTime time.Time } type ZoneOpenedStream struct { TenantID string Zone string + Type StreamType + ConnTime time.Time } func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( @@ -152,19 +171,26 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( shouldDisconnectStream := events.NewNeverListener() md, _ := metadata.FromIncomingContext(stream.Context()) features := md.Get(kds.FeaturesMetadataKey) - + connectTime := time.Now() + streamType := GetStreamType(rpcName) if slices.Contains(features, kds.FeatureZonePingHealth) { shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantZoneID.TenantID && disconnectEvent.Zone == zone + switch event := e.(type) { + case ZoneWentOffline: + return event.TenantID == tenantZoneID.TenantID && event.Zone == zone + case StreamCancelled: + return event.TenantID == tenantZoneID.TenantID && event.Zone == zone && event.Type == streamType && event.ConnTime == connectTime + default: + return false + } }) - g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantZoneID.TenantID}) + g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantZoneID.TenantID, Type: streamType, ConnTime: connectTime}) } defer shouldDisconnectStream.Close() logger.Info("Envoy Admin RPC stream started") rpc.ClientConnected(tenantZoneID.String(), stream) - if err := g.storeStreamConnection(stream.Context(), zone, rpcName, g.instanceID); err != nil { + if err := g.storeStreamConnection(stream.Context(), zone, streamType, connectTime); err != nil { if errors.Is(err, context.Canceled) && errors.Is(stream.Context().Err(), context.Canceled) { return status.Error(codes.Canceled, "stream was cancelled") } @@ -211,7 +237,19 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( } } -func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone string, rpcName string, instance string) error { +func GetStreamType(rpcName string) StreamType { + switch rpcName { + case ClustersRPC: + return Clusters + case ConfigDumpRPC: + return ConfigDump + case StatsRPC: + return Stats + } + return StreamType("NotSupported") +} + +func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone string, streamType StreamType, connectTime time.Time) error { key := model.ResourceKey{Name: zone} // wait for Zone to be created, only then we can create Zone Insight @@ -236,15 +274,29 @@ func (g *GlobalKDSServiceServer) storeStreamConnection(ctx context.Context, zone zoneInsight := system.NewZoneInsightResource() return manager.Upsert(ctx, g.resManager, key, zoneInsight, func(resource model.Resource) error { if zoneInsight.Spec.EnvoyAdminStreams == nil { - zoneInsight.Spec.EnvoyAdminStreams = &v1alpha1.EnvoyAdminStreams{} + zoneInsight.Spec.EnvoyAdminStreams = &system_proto.EnvoyAdminStreams{} + } + if zoneInsight.Spec.KdsStreams == nil { + zoneInsight.Spec.KdsStreams = &system_proto.KDSStreams{} + } + stream := zoneInsight.Spec.GetKDSStream(string(streamType)) + if stream == nil { + stream = &system_proto.KDSStream{} + } + if stream.GetConnectTime() == nil || proto.MustTimestampFromProto(stream.ConnectTime).Before(connectTime) { + stream.GlobalInstanceId = g.instanceID + stream.ConnectTime = proto.MustTimestampProto(connectTime) } - switch rpcName { - case ConfigDumpRPC: - zoneInsight.Spec.EnvoyAdminStreams.ConfigDumpGlobalInstanceId = instance - case StatsRPC: - zoneInsight.Spec.EnvoyAdminStreams.StatsGlobalInstanceId = instance - case ClustersRPC: - zoneInsight.Spec.EnvoyAdminStreams.ClustersGlobalInstanceId = instance + switch streamType { + case ConfigDump: + zoneInsight.Spec.EnvoyAdminStreams.ConfigDumpGlobalInstanceId = g.instanceID + zoneInsight.Spec.KdsStreams.ConfigDump = stream + case Stats: + zoneInsight.Spec.EnvoyAdminStreams.StatsGlobalInstanceId = g.instanceID + zoneInsight.Spec.KdsStreams.Stats = stream + case Clusters: + zoneInsight.Spec.EnvoyAdminStreams.ClustersGlobalInstanceId = g.instanceID + zoneInsight.Spec.KdsStreams.Clusters = stream } return nil }, manager.WithConflictRetry(g.upsertCfg.ConflictRetryBaseBackoff.Duration, g.upsertCfg.ConflictRetryMaxTimes, g.upsertCfg.ConflictRetryJitterPercent)) // we need retry because zone sink or other RPC may also update the insight.