Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
 * rename template notifications -> template events
 * rename pkg/event -> pkg/kafka
 * rename pkg/notifications -> pkg/event
  • Loading branch information
rverdile committed Feb 7, 2024
1 parent 6d23a79 commit a44f957
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 78 deletions.
2 changes: 1 addition & 1 deletion cmd/content-sources/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
mockRbac(ctx, &wg)
}
config.SetupNotifications()
config.SetupTemplatesNotifications()
config.SetupTemplateEvents()

wg.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion configs/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ options:
paged_rpm_inserts_limit: 100
introspect_api_time_limit_sec: 0
enable_notifications: true
enable_templates_notifications: true
template_event_topic: "platform.content-sources.template"
# metrics:
# path: "/metrics"
# port: 9000
Expand Down
42 changes: 21 additions & 21 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
ce "github.com/content-services/content-sources-backend/pkg/errors"
"github.com/content-services/content-sources-backend/pkg/event"
"github.com/content-services/content-sources-backend/pkg/kafka"
"github.com/labstack/echo/v4"
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/rs/zerolog/log"
Expand All @@ -22,21 +22,21 @@ import (
const DefaultAppName = "content-sources"

type Configuration struct {
Database Database
Logging Logging
Loaded bool
Certs Certs
Options Options
Kafka event.KafkaConfig
Cloudwatch Cloudwatch
Metrics Metrics
Clients Clients `mapstructure:"clients"`
Mocks Mocks `mapstructure:"mocks"`
Sentry Sentry `mapstructure:"sentry"`
NotificationsClient cloudevents.Client `mapstructure:"notification_client"`
TemplatesNotificationsClient cloudevents.Client `mapstructure:"templates_notification_client"`
Tasking Tasking `mapstructure:"tasking"`
Features FeatureSet `mapstructure:"features"`
Database Database
Logging Logging
Loaded bool
Certs Certs
Options Options
Kafka kafka.KafkaConfig
Cloudwatch Cloudwatch
Metrics Metrics
Clients Clients `mapstructure:"clients"`
Mocks Mocks `mapstructure:"mocks"`
Sentry Sentry `mapstructure:"sentry"`
NotificationsClient cloudevents.Client `mapstructure:"notification_client"`
TemplateEventClient cloudevents.Client `mapstructure:"template_event_client"`
Tasking Tasking `mapstructure:"tasking"`
Features FeatureSet `mapstructure:"features"`
}

type Clients struct {
Expand Down Expand Up @@ -156,10 +156,10 @@ type Options struct {
PagedRpmInsertsLimit int `mapstructure:"paged_rpm_inserts_limit"`
IntrospectApiTimeLimitSec int `mapstructure:"introspect_api_time_limit_sec"`
// If true, introspection and snapshotting always runs for nightly job invocation, regardless of how soon they happened previously. Used for testing.
AlwaysRunCronTasks bool `mapstructure:"always_run_cron_tasks"`
EnableNotifications bool `mapstructure:"enable_notifications"`
EnableTemplatesNotifications bool `mapstructure:"enable_templates_notifications"`
RepositoryImportFilter string `mapstructure:"repository_import_filter"` // Used by qe to control which repos are imported
AlwaysRunCronTasks bool `mapstructure:"always_run_cron_tasks"`
EnableNotifications bool `mapstructure:"enable_notifications"`
TemplateEventTopic string `mapstructure:"template_event_topic"`
RepositoryImportFilter string `mapstructure:"repository_import_filter"` // Used by qe to control which repos are imported
}

type Metrics struct {
Expand Down Expand Up @@ -221,7 +221,7 @@ func setDefaults(v *viper.Viper) {
v.SetDefault("options.introspect_api_time_limit_sec", DefaultIntrospectApiTimeLimitSec)
v.SetDefault("options.always_run_cron_tasks", false)
v.SetDefault("options.enable_notifications", false)
v.SetDefault("options.enable_templates_notifications", false)
v.SetDefault("options.template_event_topic", "platform.content-sources.template")
v.SetDefault("options.repository_import_filter", "")
v.SetDefault("logging.level", "info")
v.SetDefault("logging.console", true)
Expand Down
34 changes: 19 additions & 15 deletions pkg/config/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/RedHatInsights/insights-operator-utils/tls"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
"github.com/cloudevents/sdk-go/v2"
"github.com/content-services/content-sources-backend/pkg/event"
"github.com/content-services/content-sources-backend/pkg/kafka"
clowder "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
Expand All @@ -26,7 +26,7 @@ func addEventConfigDefaults(options *viper.Viper) {
options.SetDefault("kafka.retry.backoff.ms", 100)
if clowder.IsClowderEnabled() {
cfg := clowder.LoadedConfig
event.TopicTranslationConfig = event.NewTopicTranslationWithClowder(cfg)
kafka.TopicTranslationConfig = kafka.NewTopicTranslationWithClowder(cfg)
options.SetDefault("kafka.bootstrap.servers", strings.Join(clowder.KafkaServers, ","))

// Prepare topics
Expand Down Expand Up @@ -62,7 +62,7 @@ func addEventConfigDefaults(options *viper.Viper) {
}
} else {
// If clowder is not present, set defaults to local configuration
event.TopicTranslationConfig = event.NewTopicTranslationWithClowder(nil)
kafka.TopicTranslationConfig = kafka.NewTopicTranslationWithClowder(nil)
options.SetDefault("kafka.bootstrap.servers", readEnv("KAFKA_BOOTSTRAP_SERVERS", ""))
}
}
Expand All @@ -75,15 +75,15 @@ func readEnv(key string, def string) string {
return value
}

// SetupCloudEventsKafkaClient create the cloud events kafka client that will send events to the given kafka topic
// SetupCloudEventsKafkaClient create the cloud event kafka client that will send event to the given kafka topic
func SetupCloudEventsKafkaClient(topic string) (v2.Client, error) {
kafkaServers := strings.Split(LoadedConfig.Kafka.Bootstrap.Servers, ",")
saramaConfig, err := GetSaramaConfig()
if err != nil {
return nil, fmt.Errorf("error getting sarama config: %w", err)
}

topicTranslator := event.NewTopicTranslationWithClowder(clowder.LoadedConfig)
topicTranslator := kafka.NewTopicTranslationWithClowder(clowder.LoadedConfig)
mappedTopicName := topicTranslator.GetReal(topic)

if mappedTopicName == "" {
Expand All @@ -97,31 +97,31 @@ func SetupCloudEventsKafkaClient(topic string) (v2.Client, error) {

c, err := v2.NewClient(protocol, v2.WithTimeNow(), v2.WithUUIDs())
if err != nil {
return nil, fmt.Errorf("failed to create cloud events client: %w", err)
return nil, fmt.Errorf("failed to create cloud event client: %w", err)
}
return c, nil
}

// SetupTemplatesNotifications creates the cloud events kafka client for sending events to the patch service
func SetupTemplatesNotifications() {
if !LoadedConfig.Options.EnableTemplatesNotifications {
// SetupTemplateEvents creates the cloud event kafka client for sending event to the patch service
func SetupTemplateEvents() {
if LoadedConfig.Options.TemplateEventTopic == "" {
return
}

if len(LoadedConfig.Kafka.Bootstrap.Servers) == 0 {
log.Warn().Msg("SetupTemplatesNotifications: clowder.KafkaServers and configured broker was empty")
log.Warn().Msg("SetupTemplateEvents: clowder.KafkaServers and configured broker was empty")
return
}

client, err := SetupCloudEventsKafkaClient("platform.content-sources.template")
client, err := SetupCloudEventsKafkaClient(LoadedConfig.Options.TemplateEventTopic)
if err != nil {
log.Error().Err(err).Msg("SetupTemplatesNotifications failed")
log.Error().Err(err).Msg("SetupTemplateEvents failed")
return
}
LoadedConfig.TemplatesNotificationsClient = client
LoadedConfig.TemplateEventClient = client
}

// SetupNotifications creates the cloud events kafka client for sending events to the notifications service
// SetupNotifications creates the cloud event kafka client for sending event to the event service
func SetupNotifications() {
if !LoadedConfig.Options.EnableNotifications {
return
Expand All @@ -146,6 +146,10 @@ func GetSaramaConfig() (*sarama.Config, error) {
saramaConfig.Version = sarama.V2_0_0_0
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

if strings.Contains(LoadedConfig.Kafka.Sasl.Protocol, "SSL") {
saramaConfig.Net.TLS.Enable = true
}

if LoadedConfig.Kafka.Capath != "" {
tlsConfig, err := tlsutil.NewTLSConfig(LoadedConfig.Kafka.Capath)
if err != nil {
Expand All @@ -162,7 +166,7 @@ func GetSaramaConfig() (*sarama.Config, error) {
if saramaConfig.Net.SASL.Mechanism == sarama.SASLTypeSCRAMSHA512 {
saramaConfig.Net.SASL.Handshake = true
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &event.SCRAMClient{HashGeneratorFcn: sha512.New}
return &kafka.SCRAMClient{HashGeneratorFcn: sha512.New}
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions pkg/dao/repository_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/content-services/content-sources-backend/pkg/api"
"github.com/content-services/content-sources-backend/pkg/config"
ce "github.com/content-services/content-sources-backend/pkg/errors"
"github.com/content-services/content-sources-backend/pkg/event"
"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/content-sources-backend/pkg/notifications"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/content-services/yummy/pkg/yum"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -118,10 +118,10 @@ func (r repositoryConfigDaoImpl) Create(newRepoReq api.RepositoryRequest) (api.R
created.URL = newRepo.URL
created.Status = newRepo.Status

notifications.SendNotification(
event.SendNotification(
newRepoConfig.OrgID,
notifications.RepositoryCreated,
[]repositories.Repositories{notifications.MapRepositoryResponse(created)},
event.RepositoryCreated,
[]repositories.Repositories{event.MapRepositoryResponse(created)},
)

return created, nil
Expand All @@ -142,9 +142,9 @@ func (r repositoryConfigDaoImpl) BulkCreate(newRepositories []api.RepositoryRequ

mappedValues := []repositories.Repositories{}
for i := 0; i < len(responses); i++ {
mappedValues = append(mappedValues, notifications.MapRepositoryResponse(responses[i]))
mappedValues = append(mappedValues, event.MapRepositoryResponse(responses[i]))
}
notifications.SendNotification(*newRepositories[0].OrgID, notifications.RepositoryCreated, mappedValues)
event.SendNotification(*newRepositories[0].OrgID, event.RepositoryCreated, mappedValues)

return responses, errs
}
Expand Down Expand Up @@ -517,10 +517,10 @@ func (r repositoryConfigDaoImpl) Update(orgID, uuid string, repoParams api.Repos
repositoryResponse := api.RepositoryResponse{}
ModelToApiFields(repoConfig, &repositoryResponse)

notifications.SendNotification(
event.SendNotification(
orgID,
notifications.RepositoryUpdated,
[]repositories.Repositories{notifications.MapRepositoryResponse(repositoryResponse)},
event.RepositoryUpdated,
[]repositories.Repositories{event.MapRepositoryResponse(repositoryResponse)},
)
return nil
})
Expand All @@ -531,10 +531,10 @@ func (r repositoryConfigDaoImpl) Update(orgID, uuid string, repoParams api.Repos
repositoryResponse := api.RepositoryResponse{}
ModelToApiFields(repoConfig, &repositoryResponse)

notifications.SendNotification(
event.SendNotification(
orgID,
notifications.RepositoryUpdated,
[]repositories.Repositories{notifications.MapRepositoryResponse(repositoryResponse)},
event.RepositoryUpdated,
[]repositories.Repositories{event.MapRepositoryResponse(repositoryResponse)},
)

repoConfig.Repository = models.Repository{}
Expand Down Expand Up @@ -593,10 +593,10 @@ func (r repositoryConfigDaoImpl) SoftDelete(orgID string, uuid string) error {
repositoryResponse := api.RepositoryResponse{}
ModelToApiFields(repoConfig, &repositoryResponse)

notifications.SendNotification(
event.SendNotification(
orgID,
notifications.RepositoryDeleted,
[]repositories.Repositories{notifications.MapRepositoryResponse(repositoryResponse)},
event.RepositoryDeleted,
[]repositories.Repositories{event.MapRepositoryResponse(repositoryResponse)},
)

return nil
Expand Down Expand Up @@ -636,9 +636,9 @@ func (r repositoryConfigDaoImpl) BulkDelete(orgID string, uuids []string) []erro
if len(responses) > 0 {
mappedValues := make([]repositories.Repositories, len(responses))
for i := 0; i < len(responses); i++ {
mappedValues[i] = notifications.MapRepositoryResponse(responses[i])
mappedValues[i] = event.MapRepositoryResponse(responses[i])
}
notifications.SendNotification(orgID, notifications.RepositoryDeleted, mappedValues)
event.SendNotification(orgID, event.RepositoryDeleted, mappedValues)
}

return errs
Expand Down
8 changes: 4 additions & 4 deletions pkg/dao/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/content-services/content-sources-backend/pkg/api"
"github.com/content-services/content-sources-backend/pkg/config"
ce "github.com/content-services/content-sources-backend/pkg/errors"
"github.com/content-services/content-sources-backend/pkg/event"
"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/content-sources-backend/pkg/notifications"
"github.com/jackc/pgx/v5/pgconn"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -89,7 +89,7 @@ func (t templateDaoImpl) create(tx *gorm.DB, reqTemplate api.TemplateRequest) (a
templatesModelToApi(modelTemplate, &respTemplate)
respTemplate.RepositoryUUIDS = reqTemplate.RepositoryUUIDS

notifications.SendTemplatesNotification(*reqTemplate.OrgID, notifications.TemplateCreated, []api.TemplateResponse{respTemplate})
event.SendTemplateEvent(*reqTemplate.OrgID, event.TemplateCreated, []api.TemplateResponse{respTemplate})

return respTemplate, nil
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (t templateDaoImpl) Update(orgID string, uuid string, templParams api.Templ
return resp, fmt.Errorf("could not fetch template %w", err)
}

notifications.SendTemplatesNotification(orgID, notifications.TemplateUpdated, []api.TemplateResponse{resp})
event.SendTemplateEvent(orgID, event.TemplateUpdated, []api.TemplateResponse{resp})

return resp, err
}
Expand Down Expand Up @@ -281,7 +281,7 @@ func (t templateDaoImpl) SoftDelete(orgID string, uuid string) error {

var resp api.TemplateResponse
templatesModelToApi(modelTemplate, &resp)
notifications.SendTemplatesNotification(orgID, notifications.TemplateDeleted, []api.TemplateResponse{resp})
event.SendTemplateEvent(orgID, event.TemplateDeleted, []api.TemplateResponse{resp})

return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/notifications/client.go → pkg/event/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notifications
package event

import (
"context"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/rs/zerolog/log"
)

// SendNotification - Sends a notification about a repository to the notifications service
// SendNotification - Sends an event about a repository to the event service
func SendNotification(orgID string, eventName EventName, repos []repositories.Repositories) {
if config.Get().NotificationsClient != nil && len(repos) > 0 {
eventNameStr := eventName.String()
Expand Down Expand Up @@ -78,9 +78,9 @@ func SetEmptyToNil(value string) *string {
return &value
}

// SendTemplatesNotification - Sends a notification about a template to the patch service
func SendTemplatesNotification(orgID string, eventName EventName, templates []api.TemplateResponse) {
if config.Get().TemplatesNotificationsClient != nil && len(templates) > 0 {
// SendTemplateEvent - Sends an event about a template to the patch service
func SendTemplateEvent(orgID string, eventName EventName, templates []api.TemplateResponse) {
if config.Get().TemplateEventClient != nil && len(templates) > 0 {
eventNameStr := eventName.String()
newUUID, _ := uuid.NewRandom()
e := cloudevents.NewEvent()
Expand All @@ -101,7 +101,7 @@ func SendTemplatesNotification(orgID string, eventName EventName, templates []ap

ctx := cloudevents.WithEncodingStructured(context.Background())
// Send the event
if result := config.Get().TemplatesNotificationsClient.Send(ctx, e); cloudevents.IsUndelivered(result) {
if result := config.Get().TemplateEventClient.Send(ctx, e); cloudevents.IsUndelivered(result) {
log.Error().Msgf("Notification message failed to send: %v", result)
return
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notifications
package event

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notifications
package event

type EventName int

Expand Down
Loading

0 comments on commit a44f957

Please sign in to comment.