Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

email-exporter: Initial implementation #7998

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
99f0fd9
email: Initial Pardot client implementation
beautifulentropy Feb 6, 2025
fd39210
email: Initial Exporter implementation
beautifulentropy Feb 6, 2025
9dc4033
cmd/email-exporter: Initial implementation
beautifulentropy Feb 6, 2025
e8ff500
Adjust proto.
beautifulentropy Feb 12, 2025
ccb76c2
email/pardot: Address comments
beautifulentropy Feb 12, 2025
cdff8d1
email/exporter: Address comments
beautifulentropy Feb 12, 2025
da59011
cmd/email-exporter: Address comments
beautifulentropy Feb 12, 2025
2894232
cmd: Add email-exporter subcommand
beautifulentropy Feb 12, 2025
dfce977
test: Add email-exporter configuration
beautifulentropy Feb 12, 2025
5b74c96
test/pardot-test-srv: Initial implementation of mock API server
beautifulentropy Feb 12, 2025
e40db1c
startservers: Add email-exporter and pardot-test-srv
beautifulentropy Feb 12, 2025
6b60eb3
Satisfy Typos
beautifulentropy Feb 13, 2025
79345c4
wfe: Add email-exporter and provide mocks
beautifulentropy Feb 14, 2025
081f31f
email-exporter: Simplify implementation and make per-day limit config…
beautifulentropy Feb 14, 2025
5f38309
integration: Add end-to-end test of the email-exporter
beautifulentropy Feb 14, 2025
30c983b
Merge remote-tracking branch 'origin/main' into add-email-exporter
beautifulentropy Feb 14, 2025
fab5b5e
Lint JSON configs
beautifulentropy Feb 14, 2025
ff049b7
Fix integration test.
beautifulentropy Feb 14, 2025
4c788db
email/exporter: Add unit test coverage
beautifulentropy Feb 14, 2025
ec6d768
email/pardot: Add unit tests
beautifulentropy Feb 19, 2025
5100332
email/exporter: Fix race in unit tests
beautifulentropy Feb 19, 2025
badc330
Prospect(s) -> Contact(s)
beautifulentropy Feb 19, 2025
6402316
Merge remote-tracking branch 'origin/main' into add-email-exporter
beautifulentropy Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ VERSION ?= 1.0.0
EPOCH ?= 1
MAINTAINER ?= "Community"

CMDS = admin boulder ceremony ct-test-srv
CMDS = admin boulder ceremony ct-test-srv pardot-test-srv
CMD_BINS = $(addprefix bin/, $(CMDS) )
OBJECTS = $(CMD_BINS)

Expand Down
14 changes: 12 additions & 2 deletions cmd/boulder-wfe2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/config"
emailpb "github.com/letsencrypt/boulder/email/proto"
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/goodkey"
"github.com/letsencrypt/boulder/goodkey/sagoodkey"
Expand Down Expand Up @@ -59,8 +60,9 @@ type Config struct {

TLS cmd.TLSConfig

RAService *cmd.GRPCClientConfig
SAService *cmd.GRPCClientConfig
RAService *cmd.GRPCClientConfig
SAService *cmd.GRPCClientConfig
EmailExporter *cmd.GRPCClientConfig

// GetNonceService is a gRPC config which contains a single SRV name
// used to lookup nonce-service instances used exclusively for nonce
Expand Down Expand Up @@ -285,6 +287,13 @@ func main() {
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := sapb.NewStorageAuthorityReadOnlyClient(saConn)

var eec emailpb.ExporterClient
if c.WFE.EmailExporter != nil {
emailExporterConn, err := bgrpc.ClientSetup(c.WFE.EmailExporter, tlsConfig, stats, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
eec = emailpb.NewExporterClient(emailExporterConn)
}

if c.WFE.RedeemNonceService == nil {
cmd.Fail("'redeemNonceService' must be configured.")
}
Expand Down Expand Up @@ -351,6 +360,7 @@ func main() {
c.WFE.StaleTimeout.Duration,
rac,
sac,
eec,
gnc,
rnc,
noncePrefixKey,
Expand Down
1 change: 1 addition & 0 deletions cmd/boulder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/letsencrypt/boulder/cmd/crl-checker"
_ "github.com/letsencrypt/boulder/cmd/crl-storer"
_ "github.com/letsencrypt/boulder/cmd/crl-updater"
_ "github.com/letsencrypt/boulder/cmd/email-exporter"
_ "github.com/letsencrypt/boulder/cmd/expiration-mailer"
_ "github.com/letsencrypt/boulder/cmd/id-exporter"
_ "github.com/letsencrypt/boulder/cmd/log-validator"
Expand Down
107 changes: 107 additions & 0 deletions cmd/email-exporter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package notmain

import (
"context"
"flag"
"os"

"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/email"
emailpb "github.com/letsencrypt/boulder/email/proto"
bgrpc "github.com/letsencrypt/boulder/grpc"
)

// Config holds the configuration for the email-exporter service.
type Config struct {
EmailExporter struct {
cmd.ServiceConfig

// PerDayLimit is our daily limit as determined by the tier of our
// Salesforce account. For more information, see:
// https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits
PerDayLimit float64 `validate:"required,min=1"`

// PardotBusinessUnit is the Pardot business unit to use.
PardotBusinessUnit string `validate:"required"`

// ClientId is the OAuth API client ID provided by Salesforce.
ClientId cmd.PasswordConfig

// ClientSecret is the OAuth API client secret provided by Salesforce.
ClientSecret cmd.PasswordConfig

// SalesforceBaseURL is the base URL for the Salesforce API. (e.g.,
// "https://login.salesforce.com")
SalesforceBaseURL string `validate:"required"`

// PardotBaseURL is the base URL for the Pardot API. (e.g.,
// "https://pi.pardot.com")
PardotBaseURL string `validate:"required"`
}
Syslog cmd.SyslogConfig
OpenTelemetry cmd.OpenTelemetryConfig
}

func main() {
configFile := flag.String("config", "", "Path to configuration file")
grpcAddr := flag.String("addr", "", "gRPC listen address override")
debugAddr := flag.String("debug-addr", "", "Debug server address override")
flag.Parse()

if *configFile == "" {
flag.Usage()
os.Exit(1)
}

var c Config
err := cmd.ReadConfigFile(*configFile, &c)
cmd.FailOnError(err, "Reading JSON config file into config structure")

if *grpcAddr != "" {
c.EmailExporter.ServiceConfig.GRPC.Address = *grpcAddr
}
if *debugAddr != "" {
c.EmailExporter.ServiceConfig.DebugAddr = *debugAddr
}

scope, logger, oTelShutdown := cmd.StatsAndLogging(c.Syslog, c.OpenTelemetry, c.EmailExporter.ServiceConfig.DebugAddr)
defer oTelShutdown(context.Background())

logger.Info(cmd.VersionString())

clk := cmd.Clock()
clientId, err := c.EmailExporter.ClientId.Pass()
cmd.FailOnError(err, "Loading client ID")
clientSecret, err := c.EmailExporter.ClientSecret.Pass()
cmd.FailOnError(err, "Loading client secret")

pardotClient, err := email.NewPardotClientImpl(
clk,
c.EmailExporter.PardotBusinessUnit,
clientId,
clientSecret,
c.EmailExporter.SalesforceBaseURL,
c.EmailExporter.PardotBaseURL,
)
cmd.FailOnError(err, "Creating Pardot client")
exporterServer := email.NewExporterImpl(pardotClient, c.EmailExporter.PerDayLimit, scope, logger)

tlsConfig, err := c.EmailExporter.TLS.Load(scope)
cmd.FailOnError(err, "Loading TLS config")

daemonCtx, shutdownExporterServer := context.WithCancel(context.Background())
go exporterServer.Start(daemonCtx)

start, err := bgrpc.NewServer(c.EmailExporter.GRPC, logger).Add(
&emailpb.Exporter_ServiceDesc, exporterServer).Build(tlsConfig, scope, clk)
cmd.FailOnError(err, "Configuring gRPC server")

err = start()
shutdownExporterServer()
exporterServer.Drain()
cmd.FailOnError(err, "email-exporter gRPC service failed to start")
}

func init() {
cmd.RegisterCommand("email-exporter", main, &cmd.ConfigValidator{Config: &Config{}})
}
161 changes: 161 additions & 0 deletions email/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package email

import (
"context"
"errors"
"sync"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/letsencrypt/boulder/core"
emailpb "github.com/letsencrypt/boulder/email/proto"
berrors "github.com/letsencrypt/boulder/errors"
blog "github.com/letsencrypt/boulder/log"
)

const (
// five is the number of concurrent workers processing the email queue. This
// number was chosen specifically to match the number of concurrent
// connections allowed by the Pardot API.
five = 5

// queueCap enforces a maximum stack size to prevent unbounded growth.
queueCap = 10000
)

var ErrQueueFull = errors.New("email-exporter queue is full")

// ExporterImpl implements the gRPC server and processes email exports.
type ExporterImpl struct {
emailpb.UnsafeExporterServer

sync.Mutex
drainWG sync.WaitGroup
wake *sync.Cond

limiter *rate.Limiter
toSend []string
client PardotClient
emailsHandledCounter prometheus.Counter
log blog.Logger
}

var _ emailpb.ExporterServer = (*ExporterImpl)(nil)

// NewExporterImpl creates a new ExporterImpl.
func NewExporterImpl(client PardotClient, perDayLimit float64, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl {
// This limiter enforces the daily Pardot API limit and restricts
// concurrency to the maximum of 5 requests specified in their
// documentation. For more details see:
// https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits
limiter := rate.NewLimiter(rate.Limit(perDayLimit/86400.0), 5)

emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: "email_exporter_emails_handled",
Help: "Total number of emails handled by the email exporter",
})
scope.MustRegister(emailsHandledCounter)

impl := &ExporterImpl{
limiter: limiter,
toSend: make([]string, 0, queueCap),
client: client,
emailsHandledCounter: emailsHandledCounter,
log: logger,
}
impl.wake = sync.NewCond(&impl.Mutex)

queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "email_exporter_queue_length",
Help: "Current length of the email export queue",
}, func() float64 {
impl.Lock()
defer impl.Unlock()
return float64(len(impl.toSend))
})
scope.MustRegister(queueGauge)

return impl
}

// CreateProspects enqueues the provided email addresses. If the queue cannot
// accommodate the new emails, an ErrQueueFull is returned.
func (impl *ExporterImpl) CreateProspects(ctx context.Context, req *emailpb.CreateProspectsRequest) (*emptypb.Empty, error) {
if core.IsAnyNilOrZero(req, req.Emails) {
return nil, berrors.InternalServerError("Incomplete UpsertEmails request")
}

impl.Lock()
spotsLeft := queueCap - len(impl.toSend)
if spotsLeft < len(req.Emails) {
return nil, ErrQueueFull
}
impl.toSend = append(impl.toSend, req.Emails...)
impl.Unlock()
// Wake waiting workers to process the new emails.
impl.wake.Broadcast()

return &emptypb.Empty{}, nil
}

// Start begins asynchronous processing of the email queue. When the parent
// daemonCtx is cancelled the queue will be drained and the workers will exit.
func (impl *ExporterImpl) Start(daemonCtx context.Context) {
go func() {
<-daemonCtx.Done()
impl.Lock()
// Wake waiting workers to exit.
impl.wake.Broadcast()
impl.Unlock()
}()

worker := func() {
defer impl.drainWG.Done()
for {
impl.Lock()

for len(impl.toSend) == 0 && daemonCtx.Err() == nil {
// Wait for the queue to be updated or the daemon to exit.
impl.wake.Wait()
}

if len(impl.toSend) == 0 && daemonCtx.Err() != nil {
// No more emails to process, exit.
impl.Unlock()
return
}

// Dequeue and dispatch an email.
last := len(impl.toSend) - 1
email := impl.toSend[last]
impl.toSend = impl.toSend[:last]
impl.Unlock()

err := impl.limiter.Wait(daemonCtx)
if err != nil {
if !errors.Is(err, context.Canceled) {
impl.log.Errf("Unexpected limiter.Wait() error: %s", err)
continue
}
}

err = impl.client.CreateProspect(email)
if err != nil {
impl.log.Errf("Sending Prospect to Pardot: %s", err)
}
impl.emailsHandledCounter.Inc()
}
}

for range five {
impl.drainWG.Add(1)
go worker()
}
}

// Drain blocks until all workers have finished processing the email queue.
func (impl *ExporterImpl) Drain() {
impl.drainWG.Wait()
}
Loading
Loading