From e6ab6530876a36b3e9c735451d7f4fab9fb77ab7 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Mon, 23 Oct 2023 21:19:28 +0200 Subject: [PATCH 1/2] Add cloud teams integration --- cmd/botkube-agent/main.go | 8 + internal/plugin/collector.go | 51 ++- internal/source/scheduler.go | 9 + pkg/api/cloudplatform/creds.go | 45 ++ .../{cloudslack.pb.go => cloud_slack.pb.go} | 100 ++--- ...lack_grpc.pb.go => cloud_slack_grpc.pb.go} | 4 +- pkg/api/cloudteams/cloud_teams.pb.go | 349 +++++++++++++++ pkg/api/cloudteams/cloud_teams_grpc.pb.go | 141 ++++++ pkg/bot/bot.go | 7 + pkg/bot/slack_cloud.go | 66 +-- pkg/bot/teams_cloud.go | 421 ++++++++++++++++++ pkg/bot/teams_cloud_grpc.go | 178 ++++++++ pkg/config/config.go | 49 +- proto/{cloudslack.proto => cloud_slack.proto} | 0 proto/cloud_teams.proto | 28 ++ 15 files changed, 1310 insertions(+), 146 deletions(-) create mode 100644 pkg/api/cloudplatform/creds.go rename pkg/api/cloudslack/{cloudslack.pb.go => cloud_slack.pb.go} (57%) rename pkg/api/cloudslack/{cloudslack_grpc.pb.go => cloud_slack_grpc.pb.go} (98%) create mode 100644 pkg/api/cloudteams/cloud_teams.pb.go create mode 100644 pkg/api/cloudteams/cloud_teams_grpc.pb.go create mode 100644 pkg/bot/teams_cloud.go create mode 100644 pkg/bot/teams_cloud_grpc.go rename proto/{cloudslack.proto => cloud_slack.proto} (100%) create mode 100644 proto/cloud_teams.proto diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 327197fb5..38cd32629 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -299,6 +299,14 @@ func run(ctx context.Context) (err error) { scheduleBotNotifier(tb) } + if commGroupCfg.CloudTeams.Enabled { + sb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter) + if err != nil { + return reportFatalError("while creating CloudSlack bot", err) + } + scheduleBotNotifier(sb) + } + if commGroupCfg.Discord.Enabled { db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter) if err != nil { diff --git a/internal/plugin/collector.go b/internal/plugin/collector.go index 4ca5c0396..6ac95aa7f 100644 --- a/internal/plugin/collector.go +++ b/internal/plugin/collector.go @@ -17,6 +17,22 @@ func NewCollector(log logrus.FieldLogger) *Collector { return &Collector{log: log} } +type botBindingsGetter interface { + config.Identifiable + GetBotBindings() config.BotBindings +} + +func collect[T botBindingsGetter](boundExecutors, boundSources map[string]struct{}, channels config.IdentifiableMap[T]) { + for _, bindings := range channels { + for _, name := range bindings.GetBotBindings().Executors { + boundExecutors[name] = struct{}{} + } + for _, name := range bindings.GetBotBindings().Sources { + boundSources[name] = struct{}{} + } + } +} + // GetAllEnabledAndUsedPlugins returns the list of all plugins that are both enabled and bind to at // least one communicator or action (automation) that is enabled. func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, []string) { @@ -25,33 +41,21 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, [ boundSources = map[string]struct{}{} ) - // Collect all used executors/sources by communication platforms - collect := func(channels config.IdentifiableMap[config.ChannelBindingsByName]) { - for _, bindings := range channels { - for _, name := range bindings.Bindings.Executors { - boundExecutors[name] = struct{}{} - } - for _, name := range bindings.Bindings.Sources { - boundSources[name] = struct{}{} - } - } - } - for _, commGroupCfg := range cfg.Communications { if commGroupCfg.Slack.Enabled { - collect(commGroupCfg.Slack.Channels) + collect(boundExecutors, boundSources, commGroupCfg.Slack.Channels) } if commGroupCfg.SocketSlack.Enabled { - collect(commGroupCfg.SocketSlack.Channels) + collect(boundExecutors, boundSources, commGroupCfg.SocketSlack.Channels) } if commGroupCfg.CloudSlack.Enabled { - collect(commGroupCfg.CloudSlack.Channels) + collect(boundExecutors, boundSources, commGroupCfg.CloudSlack.Channels) } if commGroupCfg.Mattermost.Enabled { - collect(commGroupCfg.Mattermost.Channels) + collect(boundExecutors, boundSources, commGroupCfg.Mattermost.Channels) } if commGroupCfg.Teams.Enabled { @@ -63,17 +67,16 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, [ } } - if commGroupCfg.Discord.Enabled { - for _, bindings := range commGroupCfg.Discord.Channels { - for _, name := range bindings.Bindings.Executors { - boundExecutors[name] = struct{}{} - } - for _, name := range bindings.Bindings.Sources { - boundSources[name] = struct{}{} - } + if commGroupCfg.CloudTeams.Enabled { + for _, team := range commGroupCfg.CloudTeams.Teams { + collect(boundExecutors, boundSources, team.Channels) } } + if commGroupCfg.Discord.Enabled { + collect(boundExecutors, boundSources, commGroupCfg.Discord.Channels) + } + if commGroupCfg.Webhook.Enabled { for _, name := range commGroupCfg.Webhook.Bindings.Sources { boundSources[name] = struct{}{} diff --git a/internal/source/scheduler.go b/internal/source/scheduler.go index dfbfd355f..1541b61b9 100644 --- a/internal/source/scheduler.go +++ b/internal/source/scheduler.go @@ -202,6 +202,15 @@ func (d *Scheduler) generateConfigs(ctx context.Context) error { return err } } + if commGroupCfg.CloudTeams.Enabled { + for _, teams := range commGroupCfg.CloudTeams.Teams { + for _, channel := range teams.Channels { + if err := d.generateSourceConfigs(ctx, config.CloudTeamsCommPlatformIntegration.IsInteractive(), channel.Bindings.Sources); err != nil { + return err + } + } + } + } if commGroupCfg.Discord.Enabled { for _, channel := range commGroupCfg.Discord.Channels { diff --git a/pkg/api/cloudplatform/creds.go b/pkg/api/cloudplatform/creds.go new file mode 100644 index 000000000..7bbffea3c --- /dev/null +++ b/pkg/api/cloudplatform/creds.go @@ -0,0 +1,45 @@ +package cloudplatform + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/kubeshop/botkube/internal/config/remote" +) + +const ( + APIKeyContextKey = "X-Api-Key" // #nosec + DeploymentIDContextKey = "X-Deployment-Id" // #nosec +) + +func AddStreamingClientCredentials(remoteCfg remote.Config) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + md := metadata.New(map[string]string{ + APIKeyContextKey: remoteCfg.APIKey, + DeploymentIDContextKey: remoteCfg.Identifier, + }) + + ctx = metadata.NewOutgoingContext(ctx, md) + + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + + return clientStream, nil + } +} + +func AddUnaryClientCredentials(remoteCfg remote.Config) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + md := metadata.New(map[string]string{ + APIKeyContextKey: remoteCfg.APIKey, + DeploymentIDContextKey: remoteCfg.Identifier, + }) + + ctx = metadata.NewOutgoingContext(ctx, md) + return invoker(ctx, method, req, reply, cc, opts...) + } +} diff --git a/pkg/api/cloudslack/cloudslack.pb.go b/pkg/api/cloudslack/cloud_slack.pb.go similarity index 57% rename from pkg/api/cloudslack/cloudslack.pb.go rename to pkg/api/cloudslack/cloud_slack.pb.go index 1aff4b332..98a43e4fe 100644 --- a/pkg/api/cloudslack/cloudslack.pb.go +++ b/pkg/api/cloudslack/cloud_slack.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.31.0 // protoc v4.24.0 -// source: cloudslack.proto +// source: cloud_slack.proto package cloudslack @@ -32,7 +32,7 @@ type ConnectRequest struct { func (x *ConnectRequest) Reset() { *x = ConnectRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cloudslack_proto_msgTypes[0] + mi := &file_cloud_slack_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -45,7 +45,7 @@ func (x *ConnectRequest) String() string { func (*ConnectRequest) ProtoMessage() {} func (x *ConnectRequest) ProtoReflect() protoreflect.Message { - mi := &file_cloudslack_proto_msgTypes[0] + mi := &file_cloud_slack_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -58,7 +58,7 @@ func (x *ConnectRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ConnectRequest.ProtoReflect.Descriptor instead. func (*ConnectRequest) Descriptor() ([]byte, []int) { - return file_cloudslack_proto_rawDescGZIP(), []int{0} + return file_cloud_slack_proto_rawDescGZIP(), []int{0} } func (x *ConnectRequest) GetInstanceId() string { @@ -89,7 +89,7 @@ type ConnectResponse struct { func (x *ConnectResponse) Reset() { *x = ConnectResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cloudslack_proto_msgTypes[1] + mi := &file_cloud_slack_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -102,7 +102,7 @@ func (x *ConnectResponse) String() string { func (*ConnectResponse) ProtoMessage() {} func (x *ConnectResponse) ProtoReflect() protoreflect.Message { - mi := &file_cloudslack_proto_msgTypes[1] + mi := &file_cloud_slack_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -115,7 +115,7 @@ func (x *ConnectResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ConnectResponse.ProtoReflect.Descriptor instead. func (*ConnectResponse) Descriptor() ([]byte, []int) { - return file_cloudslack_proto_rawDescGZIP(), []int{1} + return file_cloud_slack_proto_rawDescGZIP(), []int{1} } func (x *ConnectResponse) GetEvent() []byte { @@ -132,47 +132,47 @@ func (x *ConnectResponse) GetError() []byte { return nil } -var File_cloudslack_proto protoreflect.FileDescriptor - -var file_cloudslack_proto_rawDesc = []byte{ - 0x0a, 0x10, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x22, 0x46, - 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x22, 0x3d, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, - 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, - 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x56, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x53, 0x6c, - 0x61, 0x63, 0x6b, 0x12, 0x48, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x1a, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x63, 0x6c, 0x6f, - 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x14, 0x5a, - 0x12, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, - 0x61, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var File_cloud_slack_proto protoreflect.FileDescriptor + +var file_cloud_slack_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x5f, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x22, + 0x46, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, + 0x64, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x62, 0x6f, 0x74, 0x49, 0x64, 0x22, 0x3d, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0x56, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x53, + 0x6c, 0x61, 0x63, 0x6b, 0x12, 0x48, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, + 0x1a, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x73, 0x6c, 0x61, 0x63, 0x6b, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x14, + 0x5a, 0x12, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x73, + 0x6c, 0x61, 0x63, 0x6b, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_cloudslack_proto_rawDescOnce sync.Once - file_cloudslack_proto_rawDescData = file_cloudslack_proto_rawDesc + file_cloud_slack_proto_rawDescOnce sync.Once + file_cloud_slack_proto_rawDescData = file_cloud_slack_proto_rawDesc ) -func file_cloudslack_proto_rawDescGZIP() []byte { - file_cloudslack_proto_rawDescOnce.Do(func() { - file_cloudslack_proto_rawDescData = protoimpl.X.CompressGZIP(file_cloudslack_proto_rawDescData) +func file_cloud_slack_proto_rawDescGZIP() []byte { + file_cloud_slack_proto_rawDescOnce.Do(func() { + file_cloud_slack_proto_rawDescData = protoimpl.X.CompressGZIP(file_cloud_slack_proto_rawDescData) }) - return file_cloudslack_proto_rawDescData + return file_cloud_slack_proto_rawDescData } -var file_cloudslack_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_cloudslack_proto_goTypes = []interface{}{ +var file_cloud_slack_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_cloud_slack_proto_goTypes = []interface{}{ (*ConnectRequest)(nil), // 0: cloudslack.ConnectRequest (*ConnectResponse)(nil), // 1: cloudslack.ConnectResponse } -var file_cloudslack_proto_depIdxs = []int32{ +var file_cloud_slack_proto_depIdxs = []int32{ 0, // 0: cloudslack.CloudSlack.Connect:input_type -> cloudslack.ConnectRequest 1, // 1: cloudslack.CloudSlack.Connect:output_type -> cloudslack.ConnectResponse 1, // [1:2] is the sub-list for method output_type @@ -182,13 +182,13 @@ var file_cloudslack_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for field type_name } -func init() { file_cloudslack_proto_init() } -func file_cloudslack_proto_init() { - if File_cloudslack_proto != nil { +func init() { file_cloud_slack_proto_init() } +func file_cloud_slack_proto_init() { + if File_cloud_slack_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_cloudslack_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_cloud_slack_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConnectRequest); i { case 0: return &v.state @@ -200,7 +200,7 @@ func file_cloudslack_proto_init() { return nil } } - file_cloudslack_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_cloud_slack_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConnectResponse); i { case 0: return &v.state @@ -217,18 +217,18 @@ func file_cloudslack_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_cloudslack_proto_rawDesc, + RawDescriptor: file_cloud_slack_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 1, }, - GoTypes: file_cloudslack_proto_goTypes, - DependencyIndexes: file_cloudslack_proto_depIdxs, - MessageInfos: file_cloudslack_proto_msgTypes, + GoTypes: file_cloud_slack_proto_goTypes, + DependencyIndexes: file_cloud_slack_proto_depIdxs, + MessageInfos: file_cloud_slack_proto_msgTypes, }.Build() - File_cloudslack_proto = out.File - file_cloudslack_proto_rawDesc = nil - file_cloudslack_proto_goTypes = nil - file_cloudslack_proto_depIdxs = nil + File_cloud_slack_proto = out.File + file_cloud_slack_proto_rawDesc = nil + file_cloud_slack_proto_goTypes = nil + file_cloud_slack_proto_depIdxs = nil } diff --git a/pkg/api/cloudslack/cloudslack_grpc.pb.go b/pkg/api/cloudslack/cloud_slack_grpc.pb.go similarity index 98% rename from pkg/api/cloudslack/cloudslack_grpc.pb.go rename to pkg/api/cloudslack/cloud_slack_grpc.pb.go index 3e9454bb3..3d7439078 100644 --- a/pkg/api/cloudslack/cloudslack_grpc.pb.go +++ b/pkg/api/cloudslack/cloud_slack_grpc.pb.go @@ -2,7 +2,7 @@ // versions: // - protoc-gen-go-grpc v1.3.0 // - protoc v4.24.0 -// source: cloudslack.proto +// source: cloud_slack.proto package cloudslack @@ -137,5 +137,5 @@ var CloudSlack_ServiceDesc = grpc.ServiceDesc{ ClientStreams: true, }, }, - Metadata: "cloudslack.proto", + Metadata: "cloud_slack.proto", } diff --git a/pkg/api/cloudteams/cloud_teams.pb.go b/pkg/api/cloudteams/cloud_teams.pb.go new file mode 100644 index 000000000..e502131f9 --- /dev/null +++ b/pkg/api/cloudteams/cloud_teams.pb.go @@ -0,0 +1,349 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.0 +// source: cloud_teams.proto + +package cloudteams + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type AgentActivity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Req: + // + // *AgentActivity_InstanceId + // *AgentActivity_Message + Req isAgentActivity_Req `protobuf_oneof:"req"` +} + +func (x *AgentActivity) Reset() { + *x = AgentActivity{} + if protoimpl.UnsafeEnabled { + mi := &file_cloud_teams_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AgentActivity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentActivity) ProtoMessage() {} + +func (x *AgentActivity) ProtoReflect() protoreflect.Message { + mi := &file_cloud_teams_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentActivity.ProtoReflect.Descriptor instead. +func (*AgentActivity) Descriptor() ([]byte, []int) { + return file_cloud_teams_proto_rawDescGZIP(), []int{0} +} + +func (m *AgentActivity) GetReq() isAgentActivity_Req { + if m != nil { + return m.Req + } + return nil +} + +func (x *AgentActivity) GetInstanceId() string { + if x, ok := x.GetReq().(*AgentActivity_InstanceId); ok { + return x.InstanceId + } + return "" +} + +func (x *AgentActivity) GetMessage() *Message { + if x, ok := x.GetReq().(*AgentActivity_Message); ok { + return x.Message + } + return nil +} + +type isAgentActivity_Req interface { + isAgentActivity_Req() +} + +type AgentActivity_InstanceId struct { + InstanceId string `protobuf:"bytes,1,opt,name=instanceId,proto3,oneof"` +} + +type AgentActivity_Message struct { + // message sent by agent, either as a response to a command or an event from enabled sources + Message *Message `protobuf:"bytes,2,opt,name=message,proto3,oneof"` +} + +func (*AgentActivity_InstanceId) isAgentActivity_Req() {} + +func (*AgentActivity_Message) isAgentActivity_Req() {} + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TeamId string `protobuf:"bytes,1,opt,name=teamId,proto3" json:"teamId,omitempty"` + ActivityId string `protobuf:"bytes,2,opt,name=activityId,proto3" json:"activityId,omitempty"` + ConversationId string `protobuf:"bytes,3,opt,name=conversationId,proto3" json:"conversationId,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_cloud_teams_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_cloud_teams_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_cloud_teams_proto_rawDescGZIP(), []int{1} +} + +func (x *Message) GetTeamId() string { + if x != nil { + return x.TeamId + } + return "" +} + +func (x *Message) GetActivityId() string { + if x != nil { + return x.ActivityId + } + return "" +} + +func (x *Message) GetConversationId() string { + if x != nil { + return x.ConversationId + } + return "" +} + +func (x *Message) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type CloudActivity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Event []byte `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` +} + +func (x *CloudActivity) Reset() { + *x = CloudActivity{} + if protoimpl.UnsafeEnabled { + mi := &file_cloud_teams_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CloudActivity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CloudActivity) ProtoMessage() {} + +func (x *CloudActivity) ProtoReflect() protoreflect.Message { + mi := &file_cloud_teams_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CloudActivity.ProtoReflect.Descriptor instead. +func (*CloudActivity) Descriptor() ([]byte, []int) { + return file_cloud_teams_proto_rawDescGZIP(), []int{2} +} + +func (x *CloudActivity) GetEvent() []byte { + if x != nil { + return x.Event + } + return nil +} + +var File_cloud_teams_proto protoreflect.FileDescriptor + +var file_cloud_teams_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x5f, 0x74, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x74, 0x65, 0x61, 0x6d, 0x73, 0x22, + 0x69, 0x0a, 0x0d, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, + 0x12, 0x20, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, + 0x49, 0x64, 0x12, 0x2f, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x74, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x42, 0x05, 0x0a, 0x03, 0x72, 0x65, 0x71, 0x22, 0x7d, 0x0a, 0x07, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x12, 0x1e, 0x0a, + 0x0a, 0x61, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0a, 0x61, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x49, 0x64, 0x12, 0x26, 0x0a, + 0x0e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x73, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x25, 0x0a, 0x0d, 0x43, 0x6c, 0x6f, + 0x75, 0x64, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x32, 0x5a, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x54, 0x65, 0x61, 0x6d, 0x73, 0x12, 0x4c, + 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, + 0x12, 0x19, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x74, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x41, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x1a, 0x19, 0x2e, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x74, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x41, 0x63, + 0x74, 0x69, 0x76, 0x69, 0x74, 0x79, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x14, 0x5a, 0x12, + 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x74, 0x65, 0x61, + 0x6d, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_cloud_teams_proto_rawDescOnce sync.Once + file_cloud_teams_proto_rawDescData = file_cloud_teams_proto_rawDesc +) + +func file_cloud_teams_proto_rawDescGZIP() []byte { + file_cloud_teams_proto_rawDescOnce.Do(func() { + file_cloud_teams_proto_rawDescData = protoimpl.X.CompressGZIP(file_cloud_teams_proto_rawDescData) + }) + return file_cloud_teams_proto_rawDescData +} + +var file_cloud_teams_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_cloud_teams_proto_goTypes = []interface{}{ + (*AgentActivity)(nil), // 0: cloudteams.AgentActivity + (*Message)(nil), // 1: cloudteams.Message + (*CloudActivity)(nil), // 2: cloudteams.CloudActivity +} +var file_cloud_teams_proto_depIdxs = []int32{ + 1, // 0: cloudteams.AgentActivity.message:type_name -> cloudteams.Message + 0, // 1: cloudteams.CloudTeams.StreamActivity:input_type -> cloudteams.AgentActivity + 2, // 2: cloudteams.CloudTeams.StreamActivity:output_type -> cloudteams.CloudActivity + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_cloud_teams_proto_init() } +func file_cloud_teams_proto_init() { + if File_cloud_teams_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_cloud_teams_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AgentActivity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cloud_teams_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cloud_teams_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CloudActivity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_cloud_teams_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*AgentActivity_InstanceId)(nil), + (*AgentActivity_Message)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_cloud_teams_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_cloud_teams_proto_goTypes, + DependencyIndexes: file_cloud_teams_proto_depIdxs, + MessageInfos: file_cloud_teams_proto_msgTypes, + }.Build() + File_cloud_teams_proto = out.File + file_cloud_teams_proto_rawDesc = nil + file_cloud_teams_proto_goTypes = nil + file_cloud_teams_proto_depIdxs = nil +} diff --git a/pkg/api/cloudteams/cloud_teams_grpc.pb.go b/pkg/api/cloudteams/cloud_teams_grpc.pb.go new file mode 100644 index 000000000..1f6d8d834 --- /dev/null +++ b/pkg/api/cloudteams/cloud_teams_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.0 +// source: cloud_teams.proto + +package cloudteams + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + CloudTeams_StreamActivity_FullMethodName = "/cloudteams.CloudTeams/StreamActivity" +) + +// CloudTeamsClient is the client API for CloudTeams service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CloudTeamsClient interface { + StreamActivity(ctx context.Context, opts ...grpc.CallOption) (CloudTeams_StreamActivityClient, error) +} + +type cloudTeamsClient struct { + cc grpc.ClientConnInterface +} + +func NewCloudTeamsClient(cc grpc.ClientConnInterface) CloudTeamsClient { + return &cloudTeamsClient{cc} +} + +func (c *cloudTeamsClient) StreamActivity(ctx context.Context, opts ...grpc.CallOption) (CloudTeams_StreamActivityClient, error) { + stream, err := c.cc.NewStream(ctx, &CloudTeams_ServiceDesc.Streams[0], CloudTeams_StreamActivity_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &cloudTeamsStreamActivityClient{stream} + return x, nil +} + +type CloudTeams_StreamActivityClient interface { + Send(*AgentActivity) error + Recv() (*CloudActivity, error) + grpc.ClientStream +} + +type cloudTeamsStreamActivityClient struct { + grpc.ClientStream +} + +func (x *cloudTeamsStreamActivityClient) Send(m *AgentActivity) error { + return x.ClientStream.SendMsg(m) +} + +func (x *cloudTeamsStreamActivityClient) Recv() (*CloudActivity, error) { + m := new(CloudActivity) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// CloudTeamsServer is the server API for CloudTeams service. +// All implementations must embed UnimplementedCloudTeamsServer +// for forward compatibility +type CloudTeamsServer interface { + StreamActivity(CloudTeams_StreamActivityServer) error + mustEmbedUnimplementedCloudTeamsServer() +} + +// UnimplementedCloudTeamsServer must be embedded to have forward compatible implementations. +type UnimplementedCloudTeamsServer struct { +} + +func (UnimplementedCloudTeamsServer) StreamActivity(CloudTeams_StreamActivityServer) error { + return status.Errorf(codes.Unimplemented, "method StreamActivity not implemented") +} +func (UnimplementedCloudTeamsServer) mustEmbedUnimplementedCloudTeamsServer() {} + +// UnsafeCloudTeamsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CloudTeamsServer will +// result in compilation errors. +type UnsafeCloudTeamsServer interface { + mustEmbedUnimplementedCloudTeamsServer() +} + +func RegisterCloudTeamsServer(s grpc.ServiceRegistrar, srv CloudTeamsServer) { + s.RegisterService(&CloudTeams_ServiceDesc, srv) +} + +func _CloudTeams_StreamActivity_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CloudTeamsServer).StreamActivity(&cloudTeamsStreamActivityServer{stream}) +} + +type CloudTeams_StreamActivityServer interface { + Send(*CloudActivity) error + Recv() (*AgentActivity, error) + grpc.ServerStream +} + +type cloudTeamsStreamActivityServer struct { + grpc.ServerStream +} + +func (x *cloudTeamsStreamActivityServer) Send(m *CloudActivity) error { + return x.ServerStream.SendMsg(m) +} + +func (x *cloudTeamsStreamActivityServer) Recv() (*AgentActivity, error) { + m := new(AgentActivity) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// CloudTeams_ServiceDesc is the grpc.ServiceDesc for CloudTeams service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CloudTeams_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "cloudteams.CloudTeams", + HandlerType: (*CloudTeamsServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "StreamActivity", + Handler: _CloudTeams_StreamActivity_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "cloud_teams.proto", +} diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 6616aa27e..cd17a0ee1 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -3,6 +3,7 @@ package bot import ( "context" + "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/health" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/execute" @@ -49,6 +50,12 @@ type FatalErrorAnalyticsReporter interface { Close() error } +// AnalyticsCommandReporter defines a reporter that collects analytics data. +type AnalyticsCommandReporter interface { + FatalErrorAnalyticsReporter + ReportCommand(in analytics.ReportCommandInput) error +} + type channelConfigByID struct { config.ChannelBindingsByID diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 0c8a12b04..34ceb5887 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -20,13 +20,13 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/config/remote" "github.com/kubeshop/botkube/internal/health" "github.com/kubeshop/botkube/pkg/api" + "github.com/kubeshop/botkube/pkg/api/cloudplatform" pb "github.com/kubeshop/botkube/pkg/api/cloudslack" "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" @@ -38,8 +38,6 @@ import ( ) const ( - APIKeyContextKey = "X-Api-Key" // #nosec - DeploymentIDContextKey = "X-Deployment-Id" // #nosec retryDelay = time.Second maxRetries = 30 successIntervalDuration = 3 * time.Minute @@ -54,7 +52,7 @@ type CloudSlack struct { cfg config.CloudSlack client *slack.Client executorFactory ExecutorFactory - reporter cloudSlackAnalyticsReporter + reporter AnalyticsCommandReporter commGroupMetadata CommGroupMetadata realNamesForID map[string]string botMentionRegex *regexp.Regexp @@ -71,18 +69,12 @@ type CloudSlack struct { reportOnce sync.Once } -// cloudSlackAnalyticsReporter defines a reporter that collects analytics data. -type cloudSlackAnalyticsReporter interface { - FatalErrorAnalyticsReporter - ReportCommand(in analytics.ReportCommandInput) error -} - func NewCloudSlack(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.CloudSlack, clusterName string, executorFactory ExecutorFactory, - reporter cloudSlackAnalyticsReporter) (*CloudSlack, error) { + reporter AnalyticsCommandReporter) (*CloudSlack, error) { client := slack.New(cfg.Token) _, err := client.AuthTest() @@ -173,10 +165,15 @@ func (b *CloudSlack) start(ctx context.Context) error { messages := make(chan *pb.ConnectResponse, platformMessageChannelSize) defer b.shutdown(messageWorkers, messages) + remoteConfig, ok := remote.GetConfig() + if !ok { + return fmt.Errorf("while getting remote config for %s", config.CloudSlackCommPlatformIntegration) + } + creds := grpc.WithTransportCredentials(insecure.NewCredentials()) opts := []grpc.DialOption{creds, - grpc.WithStreamInterceptor(b.addStreamingClientCredentials()), - grpc.WithUnaryInterceptor(b.addUnaryClientCredentials()), + grpc.WithStreamInterceptor(cloudplatform.AddStreamingClientCredentials(remoteConfig)), + grpc.WithUnaryInterceptor(cloudplatform.AddUnaryClientCredentials(remoteConfig)), } conn, err := grpc.Dial(b.cfg.Server.URL, opts...) @@ -185,11 +182,6 @@ func (b *CloudSlack) start(ctx context.Context) error { } defer conn.Close() - remoteConfig, ok := remote.GetConfig() - if !ok { - return fmt.Errorf("while getting remote config for %s", config.CloudSlackCommPlatformIntegration) - } - req := &pb.ConnectRequest{ InstanceId: remoteConfig.Identifier, BotId: b.botID, @@ -683,44 +675,6 @@ func (b *CloudSlack) getChannelsToNotify(sourceBindings []string) []string { return out } -func (b *CloudSlack) addStreamingClientCredentials() grpc.StreamClientInterceptor { - return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - remoteCfg, ok := remote.GetConfig() - if !ok { - return nil, errors.New("empty remote configuration") - } - md := metadata.New(map[string]string{ - APIKeyContextKey: remoteCfg.APIKey, - DeploymentIDContextKey: remoteCfg.Identifier, - }) - - ctx = metadata.NewOutgoingContext(ctx, md) - - clientStream, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - return nil, err - } - - return clientStream, nil - } -} - -func (b *CloudSlack) addUnaryClientCredentials() grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - remoteCfg, ok := remote.GetConfig() - if !ok { - return errors.New("empty remote configuration") - } - md := metadata.New(map[string]string{ - APIKeyContextKey: remoteCfg.APIKey, - DeploymentIDContextKey: remoteCfg.Identifier, - }) - - ctx = metadata.NewOutgoingContext(ctx, md) - return invoker(ctx, method, req, reply, cc, opts...) - } -} - func (b *CloudSlack) checkStreamingError(data []byte) error { if len(data) == 0 { return nil diff --git a/pkg/bot/teams_cloud.go b/pkg/bot/teams_cloud.go new file mode 100644 index 000000000..1e8089075 --- /dev/null +++ b/pkg/bot/teams_cloud.go @@ -0,0 +1,421 @@ +package bot + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "sync" + "time" + + "github.com/avast/retry-go/v4" + "github.com/infracloudio/msbotbuilder-go/core/activity" + "github.com/infracloudio/msbotbuilder-go/schema" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + + "github.com/kubeshop/botkube/internal/health" + "github.com/kubeshop/botkube/pkg/api" + pb "github.com/kubeshop/botkube/pkg/api/cloudteams" + "github.com/kubeshop/botkube/pkg/bot/interactive" + "github.com/kubeshop/botkube/pkg/config" + "github.com/kubeshop/botkube/pkg/execute" + "github.com/kubeshop/botkube/pkg/execute/command" + "github.com/kubeshop/botkube/pkg/formatx" + "github.com/kubeshop/botkube/pkg/multierror" + "github.com/kubeshop/botkube/pkg/sliceutil" +) + +const channelIDKeyName = "teamsChannelId" + +var _ Bot = &CloudTeams{} + +// CloudTeams listens for user's messages, execute commands and sends back the response. +// It sends also source notifications. +// +// User message (executors) flow: +// +// +-------------+ +-------------+ +-------------+ +// | MS Teams' | REST | Cloud | REST | Pub/Sub | +// | message +------>| router +----->| #bot | +// +-------------+ +-------------+ +-------+-----+ +// v +// +-------------+ +-------------+ +-------------+ +// | Pub/Sub | gRPC | Agent | gRPC | Cloud | +// | #agent |<------+ CloudTeams |<-----+ processor | +// +------+------+ +-------------+ +-------------+ +// | +// +------v------+ +-------------+ +// | Cloud | REST | MS Teams' | +// | processor |+----->| channel | +// +-------------+ +-------------+ +// +// Notification (sources) flow: +// +// +-------------+ +-------------+ +-------------+ +// | Source | gRPC | Agent | gRPC | Cloud | +// | message +------>| CloudTeams +----->| router | +// +-------------+ +-------------+ +-------+-----+ +// v +// +-------------+ +-------------+ +-------------+ +// | MS Teams' | REST | Cloud | REST | Pub/Sub | +// | channel |<------+ processor |<-----+ #agent | +// +------+------+ +-------------+ +-------------+ +type CloudTeams struct { + log logrus.FieldLogger + cfg config.CloudTeams + executorFactory ExecutorFactory + reporter AnalyticsCommandReporter + commGroupMetadata CommGroupMetadata + notifyMutex sync.Mutex + clusterName string + status health.PlatformStatusMsg + failuresNo int + failureReason health.FailureReasonMsg + reportOnce sync.Once + botMentionRegex *regexp.Regexp + botName string + agentActivityMessage chan *pb.AgentActivity + channelsMutex sync.RWMutex + channels map[string]teamsCloudChannelConfigByID +} + +// NewCloudTeams returns a new CloudTeams instance. +func NewCloudTeams( + log logrus.FieldLogger, + commGroupMetadata CommGroupMetadata, + cfg config.CloudTeams, + clusterName string, + executorFactory ExecutorFactory, + reporter AnalyticsCommandReporter) (*CloudTeams, error) { + botMentionRegex, err := teamsBotMentionRegex(cfg.BotName) + if err != nil { + return nil, err + } + + return &CloudTeams{ + log: log, + executorFactory: executorFactory, + reporter: reporter, + cfg: cfg, + botName: cfg.BotName, + channels: teamsCloudChannelsConfig(cfg.Teams), + commGroupMetadata: commGroupMetadata, + clusterName: clusterName, + botMentionRegex: botMentionRegex, + status: health.StatusUnknown, + agentActivityMessage: make(chan *pb.AgentActivity, platformMessageChannelSize), + }, nil +} + +// Start MS Teams server to serve messages from Teams client +func (b *CloudTeams) Start(ctx context.Context) error { + return b.withRetries(ctx, b.log, maxRetries, func() error { + return b.start(ctx) + }) +} + +// SendMessageToAll sends the message to MS CloudTeams to all conversations. +func (b *CloudTeams) SendMessageToAll(ctx context.Context, msg interactive.CoreMessage) error { + return b.sendAgentActivity(ctx, msg, b.getChannelsToNotify(nil)) +} + +// SendMessage sends the message to MS CloudTeams to selected conversations. +func (b *CloudTeams) SendMessage(ctx context.Context, msg interactive.CoreMessage, sourceBindings []string) error { + return b.sendAgentActivity(ctx, msg, b.getChannelsToNotify(sourceBindings)) +} + +// IntegrationName describes the integration name. +func (b *CloudTeams) IntegrationName() config.CommPlatformIntegration { + return config.CloudTeamsCommPlatformIntegration +} + +// NotificationsEnabled returns current notification status for a given channel ID. +func (b *CloudTeams) NotificationsEnabled(channelID string) bool { + channel, exists := b.getChannels()[channelID] + if !exists { + return false + } + + return channel.notify +} + +// SetNotificationsEnabled sets a new notification status for a given channel ID. +func (b *CloudTeams) SetNotificationsEnabled(channelID string, enabled bool) error { + // avoid race conditions with using the setter concurrently, as we set a whole map + b.notifyMutex.Lock() + defer b.notifyMutex.Unlock() + + channels := b.getChannels() + channel, exists := channels[channelID] + if !exists { + return execute.ErrNotificationsNotConfigured + } + + channel.notify = enabled + channels[channelID] = channel + b.setChannels(channels) + + return nil +} + +// Type describes the integration type. +func (b *CloudTeams) Type() config.IntegrationType { + return config.BotIntegrationType +} + +// BotName returns the Bot name. +func (b *CloudTeams) BotName() string { + return fmt.Sprintf("%s", b.botName) +} + +// GetStatus gets bot status. +func (b *CloudTeams) GetStatus() health.PlatformStatus { + return health.PlatformStatus{ + Status: b.status, + Restarts: fmt.Sprintf("%d/%d", b.failuresNo, maxRetries), + Reason: b.failureReason, + } +} + +func (b *CloudTeams) start(ctx context.Context) error { + svc, err := newGrpcCloudTeamsConnector(b.log, b.cfg.Server.URL) + if err != nil { + return err + } + defer svc.Shutdown() + + err = svc.Start(ctx) + if err != nil { + return err + } + + b.setFailureReason("") + b.reportOnce.Do(func() { + if err := b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index); err != nil { + b.log.Errorf("report analytics error: %s", err.Error()) + } + b.log.Info("Botkube connected to Cloud Teams!") + }) + + parallel, ctx := errgroup.WithContext(ctx) + parallel.Go(func() error { + return svc.ProcessCloudActivity(ctx, b.handleStreamMessage) + }) + parallel.Go(func() error { + return svc.ProcessAgentActivity(ctx, b.agentActivityMessage) + }) + + return parallel.Wait() +} + +func (b *CloudTeams) withRetries(ctx context.Context, log logrus.FieldLogger, maxRetries int, fn func() error) error { + b.failuresNo = 0 + var lastFailureTimestamp time.Time + return retry.Do( + func() error { + err := fn() + if err != nil { + if !lastFailureTimestamp.IsZero() && time.Since(lastFailureTimestamp) >= successIntervalDuration { + // if the last run was long enough, we treat is as success, so we reset failures + log.Infof("Resetting failures counter as last failure was more than %s ago", successIntervalDuration) + b.failuresNo = 0 + } + + if b.failuresNo >= maxRetries { + b.setFailureReason(health.FailureReasonMaxRetriesExceeded) + log.Debugf("Reached max number of %d retries: %s", maxRetries, err) + return retry.Unrecoverable(err) + } + + lastFailureTimestamp = time.Now() + b.failuresNo++ + b.setFailureReason(health.FailureReasonConnectionError) + return err + } + b.setFailureReason("") + return nil + }, + retry.OnRetry(func(_ uint, err error) { + log.Warnf("Retrying Cloud Teams startup (attempt no %d/%d): %s", b.failuresNo, maxRetries, err) + }), + retry.Delay(retryDelay), + retry.Attempts(0), // infinite, we cancel that by our own + retry.LastErrorOnly(true), + retry.Context(ctx), + ) +} + +func (b *CloudTeams) handleStreamMessage(ctx context.Context, data *pb.CloudActivity) (*pb.AgentActivity, error) { + b.setFailureReason("") + var act schema.Activity + err := json.Unmarshal(data.Event, &act) + if err != nil { + return nil, fmt.Errorf("while unmarshaling activity event: %w", err) + } + switch act.Type { + case schema.Message: + b.log.WithField("message", formatx.StructDumper().Sdump(act)).Debug("Processing Cloud message...") + channel, exists := b.getChannelForActivity(act) + + msg := b.processMessage(ctx, act, channel, exists) + if msg.IsEmpty() { + return nil, nil + } + + raw, err := json.Marshal(msg.Message) + if err != nil { + return nil, err + } + + conversationRef := activity.GetCoversationReference(act) + return &pb.AgentActivity{ + Req: &pb.AgentActivity_Message{ + Message: &pb.Message{ + TeamId: channel.teamID, + ConversationId: conversationRef.Conversation.ID, + ActivityId: conversationRef.ActivityID, // activity ID allows us to send it as a thread message + Data: raw, + }, + }, + }, nil + default: + return nil, fmt.Errorf("activity type %s not supported yet", act.Type) + } +} + +func (b *CloudTeams) processMessage(ctx context.Context, act schema.Activity, channel teamsCloudChannelConfigByID, exists bool) interactive.CoreMessage { + trimmedMsg := b.trimBotMention(act.Text) + + e := b.executorFactory.NewDefault(execute.NewDefaultInput{ + CommGroupName: b.commGroupMetadata.Name, + Platform: b.IntegrationName(), + NotifierHandler: b, + Conversation: execute.Conversation{ + Alias: channel.alias, + IsKnown: exists, + ID: channel.Identifier(), + ExecutorBindings: channel.Bindings.Executors, + SourceBindings: channel.Bindings.Sources, + CommandOrigin: command.TypedOrigin, + }, + Message: trimmedMsg, + User: execute.UserInput{ + //Mention: "", // TODO(https://github.com/kubeshop/botkube-cloud/issues/677): set when adding interactivity support. + DisplayName: act.From.Name, + }, + }) + return e.Execute(ctx) +} + +func (b *CloudTeams) sendAgentActivity(ctx context.Context, msg interactive.CoreMessage, channels []teamsCloudChannelConfigByID) error { + errs := multierror.New() + for _, channel := range channels { + b.log.Debugf("Sending message to channel %q: %+v", channel.ID, msg) + + msg.ReplaceBotNamePlaceholder(b.BotName(), api.BotNameWithClusterName(b.clusterName)) + + raw, err := json.Marshal(msg.Message) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while proxing message via agent for channel id %q: %w", channel.ID, err)) + continue + } + + act := &pb.AgentActivity{ + Req: &pb.AgentActivity_Message{ + Message: &pb.Message{ + TeamId: channel.teamID, + ActivityId: "", // empty so it will be sent on root instead of sending as a thread message + ConversationId: channel.ID, + Data: raw, + }, + }, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case b.agentActivityMessage <- act: + } + } + return errs.ErrorOrNil() +} + +func (b *CloudTeams) getChannelForActivity(act schema.Activity) (teamsCloudChannelConfigByID, bool) { + rawChannelID, exists := act.ChannelData[channelIDKeyName] + if !exists { + return teamsCloudChannelConfigByID{}, false + } + + channelID, ok := rawChannelID.(string) + if !ok { + return teamsCloudChannelConfigByID{}, false + } + + channel, exists := b.getChannels()[channelID] + return channel, exists +} + +func (b *CloudTeams) getChannelsToNotify(sourceBindings []string) []teamsCloudChannelConfigByID { + var out []teamsCloudChannelConfigByID + for _, cfg := range b.getChannels() { + if !cfg.notify { + b.log.Infof("Skipping notification for channel %q as notifications are disabled.", cfg.Identifier()) + continue + } + + if sourceBindings != nil && !sliceutil.Intersect(sourceBindings, cfg.Bindings.Sources) { + continue + } + + out = append(out, cfg) + } + return out +} + +func (b *CloudTeams) getChannels() map[string]teamsCloudChannelConfigByID { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + return b.channels +} + +func (b *CloudTeams) setChannels(channels map[string]teamsCloudChannelConfigByID) { + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channels = channels +} + +func (b *CloudTeams) trimBotMention(msg string) string { + return b.botMentionRegex.ReplaceAllString(msg, "") +} + +func (b *CloudTeams) setFailureReason(reason health.FailureReasonMsg) { + if reason == "" { + b.status = health.StatusHealthy + } else { + b.status = health.StatusUnHealthy + } + b.failureReason = reason +} + +type teamsCloudChannelConfigByID struct { + config.ChannelBindingsByID + alias string + notify bool + teamID string +} + +func teamsCloudChannelsConfig(teams []config.TeamsBindings) map[string]teamsCloudChannelConfigByID { + out := make(map[string]teamsCloudChannelConfigByID) + for _, team := range teams { + for alias, channel := range team.Channels { + out[channel.Identifier()] = teamsCloudChannelConfigByID{ + ChannelBindingsByID: channel, + alias: alias, + notify: !channel.Notification.Disabled, + teamID: team.ID, + } + } + } + return out +} diff --git a/pkg/bot/teams_cloud_grpc.go b/pkg/bot/teams_cloud_grpc.go new file mode 100644 index 000000000..4be833ebf --- /dev/null +++ b/pkg/bot/teams_cloud_grpc.go @@ -0,0 +1,178 @@ +package bot + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/kubeshop/botkube/internal/config/remote" + "github.com/kubeshop/botkube/pkg/api/cloudplatform" + pb "github.com/kubeshop/botkube/pkg/api/cloudteams" +) + +type grpcCloudTeamsConnector struct { + log logrus.FieldLogger + grpcConn *grpc.ClientConn + remoteConfig remote.Config + + agentActivityWorkers *pool.Pool + cloudActivityWorkers *pool.Pool + + activityClient pb.CloudTeams_StreamActivityClient +} + +func newGrpcCloudTeamsConnector(log logrus.FieldLogger, url string) (*grpcCloudTeamsConnector, error) { + remoteConfig, _ := remote.GetConfig() + //if !ok { TODO: enable when config generator will work + // return nil, fmt.Errorf("while getting remote config for %q", config.CloudTeamsCommPlatformIntegration) + //} + creds := grpc.WithTransportCredentials(insecure.NewCredentials()) + opts := []grpc.DialOption{ + creds, + grpc.WithStreamInterceptor(cloudplatform.AddStreamingClientCredentials(remoteConfig)), + grpc.WithUnaryInterceptor(cloudplatform.AddUnaryClientCredentials(remoteConfig)), + } + + conn, err := grpc.Dial(url, opts...) + if err != nil { + return nil, fmt.Errorf("while creating gRCP connection: %w", err) + } + + return &grpcCloudTeamsConnector{ + log: log, + grpcConn: conn, + remoteConfig: remoteConfig, + + cloudActivityWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), + agentActivityWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), + }, nil +} + +func (c *grpcCloudTeamsConnector) Shutdown() { + c.log.Info("Shutting down Cloud Teams message processor...") + + if c.activityClient != nil { + if err := c.activityClient.CloseSend(); err != nil { + c.log.WithError(err).Error("Cannot closing gRPC stream activity connection") + } + } + + if err := c.grpcConn.Close(); err != nil { + c.log.WithError(err).Error("Cannot close gRPC connection") + } + + c.cloudActivityWorkers.Wait() + c.agentActivityWorkers.Wait() +} + +func (c *grpcCloudTeamsConnector) Start(ctx context.Context) error { + activityClient, err := pb.NewCloudTeamsClient(c.grpcConn).StreamActivity(ctx) + if err != nil { + return fmt.Errorf("while initializing gRPC cloud client: %w", err) + } + + err = activityClient.Send(&pb.AgentActivity{ + Req: &pb.AgentActivity_InstanceId{ + //InstanceId: c.remoteConfig.Identifier, // TODO: enable when config generator will work + InstanceId: "foo-123", + }, + }) + if err != nil { + return fmt.Errorf("while sending gRPC connection request. %w", err) + } + + c.activityClient = activityClient + + return nil +} + +func (c *grpcCloudTeamsConnector) ProcessAgentActivity(ctx context.Context, agentActivity chan *pb.AgentActivity) error { + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-agentActivity: + if !ok { + return nil + } + if msg == nil { + continue + } + c.agentActivityWorkers.Go(func() { + err := c.activityClient.Send(msg) + if err != nil { + c.log.WithError(err).Error("Failed to send Agent activity message") + return + } + }) + } + } +} + +type handleStreamFn func(context.Context, *pb.CloudActivity) (*pb.AgentActivity, error) + +func (c *grpcCloudTeamsConnector) ProcessCloudActivity(ctx context.Context, handleCloudActivityFn handleStreamFn) error { + cloudActivity := make(chan *pb.CloudActivity, platformMessageChannelSize) + + go func() { + c.log.Info("Starting Cloud Teams message processor...") + defer c.log.Info("Stopped Cloud Teams message processor...") + + for msg := range cloudActivity { + if len(msg.Event) == 0 { + continue + } + c.cloudActivityWorkers.Go(func() { + resp, err := handleCloudActivityFn(ctx, msg) + if err != nil { + c.log.WithError(err).Error("Failed to handle Cloud Teams activity") + return + } + + if resp == nil { + return + } + err = c.activityClient.Send(resp) + if err != nil { + c.log.WithError(err).Error("Failed to send response to Cloud Teams activity") + return + } + }) + } + }() + + for { + data, err := c.activityClient.Recv() + switch err { + case nil: + case io.EOF: + close(cloudActivity) + c.log.Warn("gRPC connection was closed by server") + return errors.New("gRPC connection closed") + default: + errStatus, ok := status.FromError(err) + if ok && errStatus.Code() == codes.Canceled && errStatus.Message() == context.Canceled.Error() { + c.log.Debugf("Context was cancelled...") + return errStatus.Err() + } + return fmt.Errorf("while receiving Cloud Teams events: %w", err) + } + + select { + case <-ctx.Done(): + close(cloudActivity) + c.log.Warn("shutdown requested") + return ctx.Err() + case cloudActivity <- data: + default: + } + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index a6d55b24a..710ea2017 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -87,6 +87,9 @@ const ( // TeamsCommPlatformIntegration defines Teams integration. TeamsCommPlatformIntegration CommPlatformIntegration = "teams" + // CloudTeamsCommPlatformIntegration defines Teams integration. + CloudTeamsCommPlatformIntegration CommPlatformIntegration = "cloudTeams" + // DiscordCommPlatformIntegration defines Discord integration. DiscordCommPlatformIntegration CommPlatformIntegration = "discord" @@ -178,11 +181,16 @@ type ChannelBindingsByName struct { MessageTriggers []TextMessageTriggers `yaml:"messageTriggers"` } -// Identifier returns ChannelBindingsByID identifier. +// Identifier returns ChannelBindingsByName identifier. func (c ChannelBindingsByName) Identifier() string { return c.Name } +// GetBotBindings returns associated bindings. +func (c ChannelBindingsByName) GetBotBindings() BotBindings { + return c.Bindings +} + type TextMessageTriggerEvent string const ( @@ -225,6 +233,11 @@ func (c ChannelBindingsByID) Identifier() string { return c.ID } +// GetBotBindings returns associated bindings. +func (c ChannelBindingsByID) GetBotBindings() BotBindings { + return c.Bindings +} + // BotBindings contains configuration for possible Bot bindings. type BotBindings struct { Sources []string `yaml:"sources"` @@ -448,6 +461,7 @@ type Communications struct { Mattermost Mattermost `yaml:"mattermost,omitempty"` Discord Discord `yaml:"discord,omitempty"` Teams Teams `yaml:"teams,omitempty"` + CloudTeams CloudTeams `yaml:"cloudTeams,omitempty"` Webhook Webhook `yaml:"webhook,omitempty"` Elasticsearch Elasticsearch `yaml:"elasticsearch,omitempty"` } @@ -525,15 +539,26 @@ type Mattermost struct { // Teams creds for authentication with MS Teams type Teams struct { - Enabled bool `yaml:"enabled"` - BotName string `yaml:"botName,omitempty"` - AppID string `yaml:"appID,omitempty"` - AppPassword string `yaml:"appPassword,omitempty"` - Port string `yaml:"port"` - MessagePath string `yaml:"messagePath,omitempty"` - // TODO: Be consistent with other communicators when MS Teams support multiple channels - //Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels"` - Bindings BotBindings `yaml:"bindings" validate:"required_if=Enabled true"` + Enabled bool `yaml:"enabled"` + BotName string `yaml:"botName,omitempty"` + AppID string `yaml:"appID,omitempty"` + AppPassword string `yaml:"appPassword,omitempty"` + Port string `yaml:"port"` + MessagePath string `yaml:"messagePath,omitempty"` + Bindings BotBindings `yaml:"bindings" validate:"required_if=Enabled true"` +} + +// CloudTeams configuration for cloud MS Teams. +type CloudTeams struct { + Enabled bool `yaml:"enabled"` + BotName string `yaml:"botName"` + Server GRPCServer `yaml:"server"` + Teams []TeamsBindings `yaml:"teams" validate:"required_if=Enabled true,dive,omitempty,min=1"` +} + +type TeamsBindings struct { + ID string `yaml:"id"` + Channels IdentifiableMap[ChannelBindingsByID] `yaml:"channels" validate:"dive,omitempty,min=1"` } // Discord configuration for authentication and send notifications @@ -640,10 +665,6 @@ type LoadWithDefaultsDetails struct { func LoadWithDefaults(configs [][]byte) (*Config, LoadWithDefaultsDetails, error) { k := koanf.New(configDelimiter) - for _, data := range configs { - fmt.Println(string(data)) - } - // load default settings if err := k.Load(rawbytes.Provider(defaultConfiguration), koanfyaml.Parser()); err != nil { return nil, LoadWithDefaultsDetails{}, fmt.Errorf("while loading default configuration: %w", err) diff --git a/proto/cloudslack.proto b/proto/cloud_slack.proto similarity index 100% rename from proto/cloudslack.proto rename to proto/cloud_slack.proto diff --git a/proto/cloud_teams.proto b/proto/cloud_teams.proto new file mode 100644 index 000000000..d5e6d2df7 --- /dev/null +++ b/proto/cloud_teams.proto @@ -0,0 +1,28 @@ +syntax = "proto3"; + +option go_package = "pkg/api/cloudteams"; + +package cloudteams; + +message AgentActivity { + oneof req { + string instanceId = 1; + // message sent by agent, either as a response to a command or an event from enabled sources + Message message = 2; + } +} + +message Message { + string teamId = 1; + string activityId = 2; + string conversationId = 3; + bytes data = 4; +} + +message CloudActivity { + bytes event = 1; +} + +service CloudTeams { + rpc StreamActivity(stream AgentActivity) returns (stream CloudActivity) {} +} From 8393f87ea218f31e8fcc4a05cb2bbdc80bc20c43 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Tue, 24 Oct 2023 12:20:02 +0200 Subject: [PATCH 2/2] Apply suggestions after review --- cmd/botkube-agent/main.go | 4 ++-- pkg/bot/teams_cloud.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 38cd32629..5555ff5fa 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -300,11 +300,11 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.CloudTeams.Enabled { - sb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter) + ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, reporter) if err != nil { return reportFatalError("while creating CloudSlack bot", err) } - scheduleBotNotifier(sb) + scheduleBotNotifier(ctb) } if commGroupCfg.Discord.Enabled { diff --git a/pkg/bot/teams_cloud.go b/pkg/bot/teams_cloud.go index 1e8089075..8e4f3b31d 100644 --- a/pkg/bot/teams_cloud.go +++ b/pkg/bot/teams_cloud.go @@ -181,7 +181,7 @@ func (b *CloudTeams) GetStatus() health.PlatformStatus { func (b *CloudTeams) start(ctx context.Context) error { svc, err := newGrpcCloudTeamsConnector(b.log, b.cfg.Server.URL) if err != nil { - return err + return fmt.Errorf("while creating gRPC connector") } defer svc.Shutdown() @@ -265,7 +265,7 @@ func (b *CloudTeams) handleStreamMessage(ctx context.Context, data *pb.CloudActi raw, err := json.Marshal(msg.Message) if err != nil { - return nil, err + return nil, fmt.Errorf("while marshaling message to trasfer it via gRPC: %w", err) } conversationRef := activity.GetCoversationReference(act)