From cf4ecd5e10368721f3b89c56ced35c5d8066879c Mon Sep 17 00:00:00 2001 From: Nicolas Bock Date: Fri, 13 Sep 2024 13:04:17 -0600 Subject: [PATCH] fixup! Do not re-use connections --- Dockerfile-debug | 4 ++-- cmd/processor/main.go | 16 +++------------- pkg/processor/processor.go | 32 ++++++++++++++------------------ pkg/processor/processor_test.go | 7 ++----- 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/Dockerfile-debug b/Dockerfile-debug index ca0f25e..2353359 100644 --- a/Dockerfile-debug +++ b/Dockerfile-debug @@ -1,7 +1,7 @@ FROM ubuntu:24.04 -LABEL maintainer="Canonical Sustaining Engineering " -LABEL org.opencontainers.image.description "Athena Monitor" +LABEL maintainer="Canonical Sustaining Engineering " +LABEL org.opencontainers.image.description "Athena Debug Container" RUN apt-get update RUN DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends apt-utils diff --git a/cmd/processor/main.go b/cmd/processor/main.go index 339b685..2a152d4 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -41,16 +41,6 @@ func main() { log.Debug(line) } - filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint) - if err != nil { - panic(err) - } - - sfClient, err := common.NewSalesforceClient(cfg) - if err != nil { - panic(err) - } - natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl)) if err != nil { panic(err) @@ -59,20 +49,20 @@ func main() { salesforceClientFactory := &common.BaseSalesforceClientFactory{} filesComClientFactory := &common.BaseFilesComClientFactory{} - p, err := processor.NewProcessor(filesClient, sfClient, filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil) + p, err := processor.NewProcessor(filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil) if err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) - if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, + if err := p.Run(ctx, func( filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber { log.Infof("Subscribing: %s - to topic: %s", name, topic) - return processor.NewBaseSubscriber(fc, sf, filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn) + return processor.NewBaseSubscriber(filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn) }); err != nil { panic(err) } diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index d2208e8..6571054 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -25,23 +25,19 @@ import ( type Processor struct { Config *config.Config Db *gorm.DB - FilesClient common.FilesComClient FilesComClientFactory common.FilesComClientFactory Hostname string Provider pubsub.Provider - SalesforceClient common.SalesforceClient SalesforceClientFactory common.SalesforceClientFactory } type BaseSubscriber struct { Config *config.Config Db *gorm.DB - FilesComClient common.FilesComClient FilesComClientFactory common.FilesComClientFactory Name string Options pubsub.HandlerOptions Reports map[string]config.Report - SalesforceClient common.SalesforceClient SalesforceClientFactory common.SalesforceClientFactory } @@ -232,7 +228,7 @@ func renderTemplate(ctx *pongo2.Context, data string) (string, error) { return out, nil } -func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceClient, fc common.FilesComClient, +func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, salesforceClientFactory common.SalesforceClientFactory, filesComClientFactory common.FilesComClientFactory, subscriber, name string, @@ -283,7 +279,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl var scripts = make(map[string]string) for reportName, report := range reports { - log.Debugf("Running %s script(s) (num=%d)", reportName, len(report.Scripts)) + log.Debugf("running %d '%s' script(s)", len(report.Scripts), reportName) for scriptName, script := range report.Scripts { if script.Run == "" { log.Errorf("No script provided to run on '%s'", scriptName) @@ -342,7 +338,7 @@ func (runner *ReportRunner) Clean() error { } func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.Msg) error { - runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClient, s.FilesComClient, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports) + runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports) if err != nil { log.Errorf("Failed to get new runner: %s", err) msg.Ack() @@ -360,7 +356,7 @@ func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.M const defaultHandlerDeadline = 10 * time.Minute -func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, +func NewBaseSubscriber( filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) *BaseSubscriber { var subscriber = BaseSubscriber{ @@ -376,16 +372,14 @@ func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient commo subscriber.Config = cfg subscriber.Db = dbConn - subscriber.FilesComClient = filesClient subscriber.FilesComClientFactory = filesComClientFactory subscriber.Name = topic subscriber.Options.Handler = subscriber.Handler - subscriber.SalesforceClient = salesforceClient subscriber.SalesforceClientFactory = salesforceClientFactory return &subscriber } -func NewProcessor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, +func NewProcessor( filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB) (*Processor, error) { var err error @@ -404,11 +398,9 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal return &Processor{ Config: cfg, Db: dbConn, - FilesClient: filesClient, FilesComClientFactory: filesComClientFactory, Hostname: hostname, Provider: provider, - SalesforceClient: salesforceClient, SalesforceClientFactory: salesforceClientFactory, }, nil } @@ -490,6 +482,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. reportMap[report.Subscriber][report.CaseID][report.Name] = append(reportMap[report.Subscriber][report.CaseID][report.Name], report) } + salesforceClient, err := p.SalesforceClientFactory.NewSalesforceClient(p.Config) + if err != nil { + log.Errorf("failed to get Salesforce client: %s", err) + return + } for subscriberName, caseMap := range reportMap { for caseId, reportsByType := range caseMap { for _, reports := range reportsByType { @@ -528,10 +525,10 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. } var comment *simpleforce.SObject if p.Config.Salesforce.EnableChatter { - comment = p.SalesforceClient.PostChatter(caseId, + comment = salesforceClient.PostChatter(caseId, chunkHeader+chunk, subscriber.SFCommentIsPublic) } else { - comment = p.SalesforceClient.PostComment(caseId, + comment = salesforceClient.PostComment(caseId, chunkHeader+chunk, subscriber.SFCommentIsPublic) } if comment == nil { @@ -551,8 +548,7 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. } } -func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient, - salesforceClient common.SalesforceClient, +func (p *Processor) Run(ctx context.Context, newSubscriberFn func( filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, name, topic string, reports map[string]config.Report, @@ -571,7 +567,7 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co }) for event := range p.Config.Processor.SubscribeTo { - go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.FilesComClientFactory, p.SalesforceClientFactory, + go pubsub.Subscribe(newSubscriberFn(p.FilesComClientFactory, p.SalesforceClientFactory, p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db)) } diff --git a/pkg/processor/processor_test.go b/pkg/processor/processor_test.go index 3851499..859b1ec 100644 --- a/pkg/processor/processor_test.go +++ b/pkg/processor/processor_test.go @@ -48,11 +48,8 @@ func (s *MockSubscriber) Setup(c *pubsub.Client) { } func (s *ProcessorTestSuite) TestRunProcessor() { - filesComClient := test.FilesComClient{} - salesforceClient := test.SalesforceClient{} - provider := &memory.MemoryProvider{} - processor, _ := NewProcessor(&filesComClient, &salesforceClient, &test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db) + processor, _ := NewProcessor(&test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -67,7 +64,7 @@ func (s *ProcessorTestSuite) TestRunProcessor() { var called = 0 - _ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, + _ = processor.Run(ctx, func( filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {