Skip to content

Commit

Permalink
fixup! Do not re-use connections
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasbock committed Sep 13, 2024
1 parent 0f34839 commit cf4ecd5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 38 deletions.
4 changes: 2 additions & 2 deletions Dockerfile-debug
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:24.04

LABEL maintainer="Canonical Sustaining Engineering <edward.hope-morley@canonical.com>"
LABEL org.opencontainers.image.description "Athena Monitor"
LABEL maintainer="Canonical Sustaining Engineering <nicolas.bock@canonical.com>"
LABEL org.opencontainers.image.description "Athena Debug Container"

RUN apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends apt-utils
Expand Down
16 changes: 3 additions & 13 deletions cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ func main() {
log.Debug(line)
}

filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint)
if err != nil {
panic(err)
}

sfClient, err := common.NewSalesforceClient(cfg)
if err != nil {
panic(err)
}

natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl))
if err != nil {
panic(err)
Expand All @@ -59,20 +49,20 @@ func main() {
salesforceClientFactory := &common.BaseSalesforceClientFactory{}
filesComClientFactory := &common.BaseFilesComClientFactory{}

p, err := processor.NewProcessor(filesClient, sfClient, filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil)
p, err := processor.NewProcessor(filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil)
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())

if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient,
if err := p.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string,
reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
log.Infof("Subscribing: %s - to topic: %s", name, topic)
return processor.NewBaseSubscriber(fc, sf, filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn)
return processor.NewBaseSubscriber(filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn)
}); err != nil {
panic(err)
}
Expand Down
32 changes: 14 additions & 18 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,19 @@ import (
type Processor struct {
Config *config.Config
Db *gorm.DB
FilesClient common.FilesComClient
FilesComClientFactory common.FilesComClientFactory
Hostname string
Provider pubsub.Provider
SalesforceClient common.SalesforceClient
SalesforceClientFactory common.SalesforceClientFactory
}

type BaseSubscriber struct {
Config *config.Config
Db *gorm.DB
FilesComClient common.FilesComClient
FilesComClientFactory common.FilesComClientFactory
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClient common.SalesforceClient
SalesforceClientFactory common.SalesforceClientFactory
}

Expand Down Expand Up @@ -232,7 +228,7 @@ func renderTemplate(ctx *pongo2.Context, data string) (string, error) {
return out, nil
}

func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceClient, fc common.FilesComClient,
func NewReportRunner(cfg *config.Config, dbConn *gorm.DB,
salesforceClientFactory common.SalesforceClientFactory,
filesComClientFactory common.FilesComClientFactory,
subscriber, name string,
Expand Down Expand Up @@ -283,7 +279,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
var scripts = make(map[string]string)

for reportName, report := range reports {
log.Debugf("Running %s script(s) (num=%d)", reportName, len(report.Scripts))
log.Debugf("running %d '%s' script(s)", len(report.Scripts), reportName)
for scriptName, script := range report.Scripts {
if script.Run == "" {
log.Errorf("No script provided to run on '%s'", scriptName)
Expand Down Expand Up @@ -342,7 +338,7 @@ func (runner *ReportRunner) Clean() error {
}

func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.Msg) error {
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClient, s.FilesComClient, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports)
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports)
if err != nil {
log.Errorf("Failed to get new runner: %s", err)
msg.Ack()
Expand All @@ -360,7 +356,7 @@ func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.M

const defaultHandlerDeadline = 10 * time.Minute

func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewBaseSubscriber(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) *BaseSubscriber {
var subscriber = BaseSubscriber{
Expand All @@ -376,16 +372,14 @@ func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient commo

subscriber.Config = cfg
subscriber.Db = dbConn
subscriber.FilesComClient = filesClient
subscriber.FilesComClientFactory = filesComClientFactory
subscriber.Name = topic
subscriber.Options.Handler = subscriber.Handler
subscriber.SalesforceClient = salesforceClient
subscriber.SalesforceClientFactory = salesforceClientFactory
return &subscriber
}

func NewProcessor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewProcessor(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB) (*Processor, error) {
var err error
Expand All @@ -404,11 +398,9 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal
return &Processor{
Config: cfg,
Db: dbConn,
FilesClient: filesClient,
FilesComClientFactory: filesComClientFactory,
Hostname: hostname,
Provider: provider,
SalesforceClient: salesforceClient,
SalesforceClientFactory: salesforceClientFactory,
}, nil
}
Expand Down Expand Up @@ -490,6 +482,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
reportMap[report.Subscriber][report.CaseID][report.Name] = append(reportMap[report.Subscriber][report.CaseID][report.Name], report)
}

salesforceClient, err := p.SalesforceClientFactory.NewSalesforceClient(p.Config)
if err != nil {
log.Errorf("failed to get Salesforce client: %s", err)
return
}
for subscriberName, caseMap := range reportMap {
for caseId, reportsByType := range caseMap {
for _, reports := range reportsByType {
Expand Down Expand Up @@ -528,10 +525,10 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
var comment *simpleforce.SObject
if p.Config.Salesforce.EnableChatter {
comment = p.SalesforceClient.PostChatter(caseId,
comment = salesforceClient.PostChatter(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
} else {
comment = p.SalesforceClient.PostComment(caseId,
comment = salesforceClient.PostComment(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
}
if comment == nil {
Expand All @@ -551,8 +548,7 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
}

func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient,
salesforceClient common.SalesforceClient,
func (p *Processor) Run(ctx context.Context, newSubscriberFn func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report,
Expand All @@ -571,7 +567,7 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co
})

for event := range p.Config.Processor.SubscribeTo {
go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.FilesComClientFactory, p.SalesforceClientFactory,
go pubsub.Subscribe(newSubscriberFn(p.FilesComClientFactory, p.SalesforceClientFactory,
p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db))
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ func (s *MockSubscriber) Setup(c *pubsub.Client) {
}

func (s *ProcessorTestSuite) TestRunProcessor() {
filesComClient := test.FilesComClient{}
salesforceClient := test.SalesforceClient{}

provider := &memory.MemoryProvider{}
processor, _ := NewProcessor(&filesComClient, &salesforceClient, &test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db)
processor, _ := NewProcessor(&test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand All @@ -67,7 +64,7 @@ func (s *ProcessorTestSuite) TestRunProcessor() {

var called = 0

_ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient,
_ = processor.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
Expand Down

0 comments on commit cf4ecd5

Please sign in to comment.