diff --git a/README.md b/README.md index d91beaa..5ec4f0c 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,17 @@ # Axiom Lambda Extension - - - - - - Axiom.co banner - - -  - Use Axiom Lambda Extension to send logs and platform events of your Lambda function to [Axiom](https://axiom.co/). Axiom detects the extension and provides you with quick filters and a dashboard. With the Axiom Lambda extension, you can forget about the extra configuration of CloudWatch and subscription filters. +## Usage + + +```sh +aws lambda update-function-configuration --function-name my-function \ + --layers arn:aws:lambda:AWS_REGION:694952825951:layer:axiom-extension-ARCH:VERSION +``` + ## Documentation For more information on how to set up and use the Axiom Lambda Extension, see the [Axiom documentation](https://axiom.co/docs/send-data/aws-lambda). diff --git a/decision/0001-no-wait-for-first-invocation.md b/decision/0001-no-wait-for-first-invocation.md new file mode 100644 index 0000000..afa76a6 --- /dev/null +++ b/decision/0001-no-wait-for-first-invocation.md @@ -0,0 +1,23 @@ +author: Islam Shehata (@dasfmi) +date: 2024-011-14 +--- + +# No wait for first invocation + +## Background + +We had a mechanism in the extension that blocks execution until we receive the first invocation. This was done to ensure that the customers can +receive logs for their first invocation. However, this was causing issues in some cases where the extension was not able to receive the first invocation +and times out. + +As this occurs more often, the invocation numbers in Axiom were significantly lower than the CloudWatch invocations. + + +## Decision + +- Decided to remove the blocking mechanism and allow the extension to start processing logs as soon as it is invoked. This will ensure that the customers receive any incoming logs without any delay. Removal of the first invocation and the channel associated with it simplifies the extension and simplifies the workflow. +- We had a function that checks if we should flush or not based on last time of flush. I replaced this function with a simple ticker instead, the ticker will tick every 1s and will flush the logs if the buffer is not empty. This will ensure that we are not blocking the logs for a long time. +- We had two Axiom clients, one that retries and one that doesn't. I think it complicates things and we should have only one client that retries. We still need a mechanism to prevent infinite retries though, a circuit breaker or something. + + + diff --git a/flusher/flusher.go b/flusher/flusher.go index c4810ab..3139f22 100644 --- a/flusher/flusher.go +++ b/flusher/flusher.go @@ -6,7 +6,6 @@ import ( "log" "os" "sync" - "time" "github.com/axiomhq/axiom-go/axiom" "github.com/axiomhq/axiom-go/axiom/ingest" @@ -24,11 +23,9 @@ const ( // Axiom Config var ( - axiomToken = os.Getenv("AXIOM_TOKEN") - axiomDataset = os.Getenv("AXIOM_DATASET") - batchSize = 1000 - flushInterval = 1 * time.Second - logger *zap.Logger + axiomToken = os.Getenv("AXIOM_TOKEN") + axiomDataset = os.Getenv("AXIOM_DATASET") + logger *zap.Logger ) func init() { @@ -36,11 +33,9 @@ func init() { } type Axiom struct { - client *axiom.Client - retryClient *axiom.Client - events []axiom.Event - eventsLock sync.Mutex - lastFlushTime time.Time + client *axiom.Client + events []axiom.Event + eventsLock sync.Mutex } func New() (*Axiom, error) { @@ -53,11 +48,6 @@ func New() (*Axiom, error) { axiom.SetUserAgent(fmt.Sprintf("axiom-lambda-extension/%s", version.Get())), } - retryClient, err := axiom.NewClient(opts...) - if err != nil { - return nil, err - } - opts = append(opts, axiom.SetNoRetry()) client, err := axiom.NewClient(opts...) if err != nil { @@ -65,21 +55,13 @@ func New() (*Axiom, error) { } f := &Axiom{ - client: client, - retryClient: retryClient, - events: make([]axiom.Event, 0), + client: client, + events: make([]axiom.Event, 0), } return f, nil } -func (f *Axiom) ShouldFlush() bool { - f.eventsLock.Lock() - defer f.eventsLock.Unlock() - - return len(f.events) > batchSize || f.lastFlushTime.IsZero() || time.Since(f.lastFlushTime) > flushInterval -} - func (f *Axiom) Queue(event axiom.Event) { f.eventsLock.Lock() defer f.eventsLock.Unlock() @@ -94,32 +76,22 @@ func (f *Axiom) QueueEvents(events []axiom.Event) { f.events = append(f.events, events...) } -func (f *Axiom) Flush(opt RetryOpt) { +func (f *Axiom) Flush() { f.eventsLock.Lock() var batch []axiom.Event // create a copy of the batch, clear the original batch, f.events = f.events, []axiom.Event{} f.eventsLock.Unlock() - f.lastFlushTime = time.Now() if len(batch) == 0 { return } var res *ingest.Status var err error - if opt == Retry { - res, err = f.retryClient.IngestEvents(context.Background(), axiomDataset, batch) - } else { - res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch) - } + res, err = f.client.IngestEvents(context.Background(), axiomDataset, batch) if err != nil { - if opt == Retry { - logger.Error("Failed to ingest events", zap.Error(err)) - } else { - logger.Error("Failed to ingest events (will try again with next event)", zap.Error(err)) - } // allow this batch to be retried again, put them back f.eventsLock.Lock() defer f.eventsLock.Unlock() diff --git a/main.go b/main.go index 8e038dc..243d6b6 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "os/signal" "path/filepath" "syscall" + "time" "github.com/peterbourgon/ff/v2/ffcli" "go.uber.org/zap" @@ -19,11 +20,9 @@ import ( ) var ( - runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API") - crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR") - extensionName = filepath.Base(os.Args[0]) - isFirstInvocation = true - runtimeDone = make(chan struct{}) + runtimeAPI = os.Getenv("AWS_LAMBDA_RUNTIME_API") + crashOnAPIErr = os.Getenv("PANIC_ON_API_ERR") + extensionName = filepath.Base(os.Args[0]) // API Port logsPort = "8080" @@ -62,6 +61,9 @@ func Run() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer stop() + ticker := time.NewTicker(1 * time.Second) // Flush every second + defer ticker.Stop() + axiom, err := flusher.New() if err != nil { // We don't want to exit with error, so that the extensions doesn't crash and crash the main function with it. @@ -73,7 +75,7 @@ func Run() error { } } - httpServer := server.New(logsPort, axiom, runtimeDone) + httpServer := server.New(logsPort, axiom) go httpServer.Run(ctx) var extensionClient *extension.Client @@ -115,7 +117,7 @@ func Run() error { // Make sure we flush with retry on exit defer func() { flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - client.Flush(flusher.Retry) + client.Flush() }) }() @@ -124,6 +126,10 @@ func Run() error { case <-ctx.Done(): logger.Info("Context done", zap.Error(ctx.Err())) return nil + case <-ticker.C: + flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { + axiom.Flush() + }) default: res, err := extensionClient.NextEvent(ctx, extensionName) if err != nil { @@ -131,24 +137,6 @@ func Run() error { return err } - // On every event received, check if we should flush - flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - if client.ShouldFlush() { - // No retry, we'll try again with the next event - client.Flush(flusher.NoRetry) - } - }) - - // Wait for the first invocation to finish (receive platform.runtimeDone log), then flush - if isFirstInvocation { - <-runtimeDone - isFirstInvocation = false - flusher.SafelyUseAxiomClient(axiom, func(client *flusher.Axiom) { - // No retry, we'll try again with the next event - client.Flush(flusher.NoRetry) - }) - } - if res.EventType == "SHUTDOWN" { _ = httpServer.Shutdown() return nil diff --git a/server/server.go b/server/server.go index 499e4ca..3d11b22 100644 --- a/server/server.go +++ b/server/server.go @@ -21,8 +21,7 @@ import ( ) var ( - logger *zap.Logger - firstInvocationDone = false + logger *zap.Logger ) // lambda environment variables @@ -52,8 +51,8 @@ func init() { } } -func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHttp.Server { - s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom, runtimeDone)) +func New(port string, axiom *flusher.Axiom) *axiomHttp.Server { + s, err := axiomHttp.NewServer(fmt.Sprintf(":%s", port), httpHandler(axiom)) if err != nil { logger.Error("Error creating server", zap.Error(err)) return nil @@ -62,7 +61,7 @@ func New(port string, axiom *flusher.Axiom, runtimeDone chan struct{}) *axiomHtt return s } -func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc { +func httpHandler(ax *flusher.Axiom) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { @@ -77,12 +76,11 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc return } - notifyRuntimeDone := false requestID := "" for _, e := range events { e["message"] = "" - // if reocrd key exists, extract the requestId and message from it + // if record key exists, extract the requestId and message from it if rec, ok := e["record"]; ok { if record, ok := rec.(map[string]any); ok { // capture requestId and set it if it exists @@ -110,11 +108,6 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc "requestId": requestID, } } - - // decide if the handler should notify the extension that the runtime is done - if e["type"] == "platform.runtimeDone" && !firstInvocationDone { - notifyRuntimeDone = true - } } // queue all the events at once to prevent locking and unlocking the mutex @@ -122,13 +115,5 @@ func httpHandler(ax *flusher.Axiom, runtimeDone chan struct{}) http.HandlerFunc flusher.SafelyUseAxiomClient(ax, func(client *flusher.Axiom) { client.QueueEvents(events) }) - - // inform the extension that platform.runtimeDone event has been received - if notifyRuntimeDone { - runtimeDone <- struct{}{} - firstInvocationDone = true - // close the channel since it will not be longer used - close(runtimeDone) - } } } diff --git a/version/version.go b/version/version.go index 45e2ef1..eced1bf 100644 --- a/version/version.go +++ b/version/version.go @@ -1,7 +1,7 @@ package version // manually set constant version -const version string = "v11" +const version string = "v12" // Get returns the Go module version of the axiom-go module. func Get() string {