Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/operator-framework/ansible-operator-plugins
go 1.23.6

require (
github.com/fsnotify/fsnotify v1.8.0
github.com/go-logr/logr v1.4.2
github.com/kr/text v0.2.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.2
Expand Down Expand Up @@ -42,7 +43,6 @@ require (
github.com/emicklei/go-restful/v3 v3.11.2 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
223 changes: 223 additions & 0 deletions internal/ansible/runner/eventapi/fileapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2018 The Operator-SDK Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package eventapi

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

// FileEventReceiver watches ansible-runner artifact files for events
type FileEventReceiver struct {
// Events is the channel used to send JobEvents back to the runner
Events chan JobEvent

// ArtifactPath is the path where ansible-runner writes artifact files
ArtifactPath string

// stopped indicates if this receiver has permanently stopped receiving events
stopped bool

// mutex controls access to the "stopped" bool
mutex sync.RWMutex

// ident is the unique identifier for a particular run of ansible-runner
ident string

// logger holds a logger that has some fields already set
logger logr.Logger

// errChan is a channel for errors
errChan chan<- error

Comment on lines +51 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent goroutine hangs when sending errors (nil/blocked errChan).

Direct sends to a possibly nil or unbuffered errChan can deadlock the watcher. Guard nil and use a non-blocking send with logging fallback.

Apply:

-	// errChan is a channel for errors
-	errChan chan<- error
+	// errChan is a channel for errors (optional; non-blocking sends)
+	errChan chan<- error
-		f.errChan <- fmt.Errorf("failed to create job_events directory: %v", err)
+		r.trySendErr(fmt.Errorf("failed to create job_events directory: %v", err))
-		f.errChan <- fmt.Errorf("failed to create file watcher: %v", err)
+		f.trySendErr(fmt.Errorf("failed to create file watcher: %v", err))
-	if err := watcher.Add(jobEventsDir); err != nil {
-		f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err)
+	if err := watcher.Add(jobEventsDir); err != nil {
+		f.trySendErr(fmt.Errorf("failed to watch job_events directory: %v", err))
-			f.errChan <- fmt.Errorf("file watcher error: %v", err)
+			f.trySendErr(fmt.Errorf("file watcher error: %v", err))

Add helper (outside hunk; place below imports or as a method near Close):

func (r *FileEventReceiver) trySendErr(err error) {
	if r == nil || err == nil {
		return
	}
	if r.errChan == nil {
		r.logger.Error(err, "fileapi error")
		return
	}
	select {
	case r.errChan <- err:
	default:
		r.logger.Error(err, "fileapi error channel blocked; dropping")
	}
}

Also applies to: 99-101, 107-110, 113-116, 131-136

🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 51-53 (and also
update usages at 99-101, 107-110, 113-116, 131-136), direct sends to r.errChan
can deadlock if r.errChan is nil or unbuffered; add a helper method named
trySendErr on *FileEventReceiver (place below imports or near Close) that checks
for nil receiver or error, logs and returns if errChan is nil, and otherwise
performs a non-blocking send (select with default) that logs when the channel is
blocked; replace all direct sends to r.errChan in the indicated line ranges with
calls to r.trySendErr(err).

// ctx is the context for cancellation
ctx context.Context
cancelFunc context.CancelFunc

// processedFiles keeps track of which files have been processed
processedFiles map[string]bool
processMutex sync.Mutex

// wg tracks goroutines for clean shutdown
wg sync.WaitGroup
}

// NewFileEventReceiver creates a new file-based event receiver
func NewFileEventReceiver(ident string, artifactPath string, errChan chan<- error) (*FileEventReceiver, error) {
ctx, cancel := context.WithCancel(context.Background())

receiver := &FileEventReceiver{
Events: make(chan JobEvent, 1000),
ArtifactPath: artifactPath,
ident: ident,
logger: logf.Log.WithName("fileapi").WithValues("job", ident),
errChan: errChan,
ctx: ctx,
cancelFunc: cancel,
processedFiles: make(map[string]bool),
}

// Start watching for file changes
receiver.wg.Add(1)
go receiver.watchJobEvents()

return receiver, nil
}

// watchJobEvents monitors the job_events directory for new files
func (f *FileEventReceiver) watchJobEvents() {
defer f.wg.Done()
defer close(f.Events)

// Watch the job_events directory
// Ansible-runner writes artifacts to {inputDir}/artifacts/{ident}/job_events
jobEventsDir := filepath.Join(f.ArtifactPath, "artifacts", f.ident, "job_events")

// Ensure directory exists before watching
if err := os.MkdirAll(jobEventsDir, 0755); err != nil {
f.errChan <- fmt.Errorf("failed to create job_events directory: %v", err)
return
}

f.logger.Info("Starting file-based event receiver", "jobEventsDir", jobEventsDir)

// Watch for new files
watcher, err := fsnotify.NewWatcher()
if err != nil {
f.errChan <- fmt.Errorf("failed to create file watcher: %v", err)
return
}
defer watcher.Close()

if err := watcher.Add(jobEventsDir); err != nil {
f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err)
return
}

for {
Comment on lines +113 to +118
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Process pre-existing JSON files to avoid missing early events.

If ansible-runner wrote events before the watcher was added, they’re currently skipped.

 	if err := watcher.Add(jobEventsDir); err != nil {
 		f.trySendErr(fmt.Errorf("failed to watch job_events directory: %v", err))
 		return
 	}
 
+	// Process any pre-existing JSON files (ordered for stable replay)
+	if entries, err := os.ReadDir(jobEventsDir); err != nil {
+		f.trySendErr(fmt.Errorf("failed to list job_events: %v", err))
+	} else {
+		sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
+		for _, e := range entries {
+			if !e.IsDir() && filepath.Ext(e.Name()) == ".json" {
+				f.processEventFile(filepath.Join(jobEventsDir, e.Name()))
+			}
+		}
+	}
+
 	for {

Also add import:

 import (
 	"context"
 	"encoding/json"
 	"fmt"
 	"os"
 	"path/filepath"
+	"sort"
 	"sync"
 	"time"

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 113 to 118, before
calling watcher.Add(jobEventsDir) iterate the jobEventsDir for any pre-existing
JSON event files (e.g., via os.ReadDir + filepath.Ext == ".json"), read each
file (os.ReadFile), and feed its contents into the same processing path the
watcher uses (reuse the existing event handling function or send the parsed
event into the same channel) while logging and continuing on per-file errors;
then proceed to add the watcher as before. Also add the necessary imports: "os"
and "path/filepath".

select {
case event, ok := <-watcher.Events:
if !ok {
return
}
// Process CREATE and WRITE events for JSON files
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:
Comment on lines +125 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle rename events to avoid missing files written via atomic rename.

Some writers do temp-write+rename; only listening for Create/Write can miss those.

-			// Process CREATE and WRITE events for JSON files
-			if (event.Op&fsnotify.Create == fsnotify.Create ||
-				event.Op&fsnotify.Write == fsnotify.Write) &&
+			// Process CREATE, WRITE, and RENAME events for JSON files
+			if (event.Op&fsnotify.Create == fsnotify.Create ||
+				event.Op&fsnotify.Write == fsnotify.Write ||
+				event.Op&fsnotify.Rename == fsnotify.Rename) &&
 				filepath.Ext(event.Name) == ".json" {
 				time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
 				f.processEventFile(event.Name)
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:
// Process CREATE, WRITE, and RENAME events for JSON files
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Rename == fsnotify.Rename) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:

if !ok {
return
}
f.errChan <- fmt.Errorf("file watcher error: %v", err)
case <-f.ctx.Done():
f.logger.V(1).Info("Context cancelled")
return
}
}
}

// processEventFile reads and parses a single event file
func (r *FileEventReceiver) processEventFile(filename string) bool {
// Check if already processed
r.processMutex.Lock()
if r.processedFiles[filename] {
r.processMutex.Unlock()
r.logger.Info("Already processed file", "file", filename)
return true
}
r.processMutex.Unlock()

// Add small delay to ensure file is fully written
time.Sleep(50 * time.Millisecond)

file, err := os.Open(filename)
if err != nil {
r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
return false
}
defer file.Close()

// Parse JSON event from file
var event JobEvent
decoder := json.NewDecoder(file)
if err := decoder.Decode(&event); err != nil {
// Skip files that aren't valid JSON (might be partial writes)
r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
return false
}
Comment on lines +154 to +171
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace fixed sleeps with a short retry loop for file stability.

Transient open/parse failures from partial writes are common; retry briefly instead of dropping.

-	// Add small delay to ensure file is fully written
-	time.Sleep(50 * time.Millisecond)
-
-	file, err := os.Open(filename)
-	if err != nil {
-		r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
-		return false
-	}
-	defer file.Close()
-
-	// Parse JSON event from file
-	var event JobEvent
-	decoder := json.NewDecoder(file)
-	if err := decoder.Decode(&event); err != nil {
-		// Skip files that aren't valid JSON (might be partial writes)
-		r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
-		return false
-	}
+	// Retry open+decode for up to ~1s to allow writer to finish
+	var event JobEvent
+	deadline := time.Now().Add(1 * time.Second)
+	for {
+		file, err := os.Open(filename)
+		if err == nil {
+			decoder := json.NewDecoder(file)
+			if derr := decoder.Decode(&event); derr == nil {
+				file.Close()
+				break
+			} else {
+				r.logger.V(3).Info("JSON decode not ready", "file", filename, "error", derr)
+			}
+			file.Close()
+		} else {
+			r.logger.V(3).Info("File not ready", "file", filename, "error", err)
+		}
+		if time.Now().After(deadline) {
+			r.logger.V(2).Info("Giving up on event file", "file", filename)
+			return false
+		}
+		time.Sleep(50 * time.Millisecond)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Add small delay to ensure file is fully written
time.Sleep(50 * time.Millisecond)
file, err := os.Open(filename)
if err != nil {
r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
return false
}
defer file.Close()
// Parse JSON event from file
var event JobEvent
decoder := json.NewDecoder(file)
if err := decoder.Decode(&event); err != nil {
// Skip files that aren't valid JSON (might be partial writes)
r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
return false
}
// Retry open+decode for up to ~1s to allow writer to finish
var event JobEvent
deadline := time.Now().Add(1 * time.Second)
for {
file, err := os.Open(filename)
if err == nil {
decoder := json.NewDecoder(file)
if derr := decoder.Decode(&event); derr == nil {
file.Close()
break
} else {
r.logger.V(3).Info("JSON decode not ready", "file", filename, "error", derr)
}
file.Close()
} else {
r.logger.V(3).Info("File not ready", "file", filename, "error", err)
}
if time.Now().After(deadline) {
r.logger.V(2).Info("Giving up on event file", "file", filename)
return false
}
time.Sleep(50 * time.Millisecond)
}
🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 154 to 171, replace
the fixed time.Sleep and single open/decode attempt with a short retry loop:
attempt to open the file and decode JSON up to N times (e.g. 3-5 attempts) with
a small backoff (e.g. 20-50ms) between attempts, on each attempt close the file
if opened, and only log and return false after all retries fail; on successful
decode return true as before. Ensure errors for non-JSON decode and open
failures are logged at the same verbosity as now and that resources are
deferred/closed properly inside the loop to avoid leaks.


// Mark as processed
r.processMutex.Lock()
r.processedFiles[filename] = true
r.processMutex.Unlock()

// Check if receiver is stopped
r.mutex.RLock()
stopped := r.stopped
r.mutex.RUnlock()

if stopped {
r.logger.V(1).Info("Receiver stopped, dropping event", "event", event.Event)
return false
}

// Send event to channel with timeout
timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()

select {
case r.Events <- event:
r.logger.V(2).Info("Processed event", "event", event.Event, "uuid", event.UUID)
if event.Event == EventPlaybookOnStats {
r.logger.Info("Successfully processed playbook_on_stats event")
}
return true
case <-timeout.C:
r.logger.Info("Timed out writing event to channel")
return true
case <-r.ctx.Done():
r.logger.V(1).Info("Context cancelled while writing event")
return false
}
}

// Close ensures that appropriate resources are cleaned up
func (r *FileEventReceiver) Close() {
r.mutex.Lock()
r.stopped = true
r.mutex.Unlock()

// Cancel context to signal goroutines to stop
if r.cancelFunc != nil {
r.cancelFunc()
}

// Wait for all goroutines to finish
r.wg.Wait()

r.logger.V(1).Info("File Event API stopped")
}
112 changes: 104 additions & 8 deletions internal/ansible/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -37,6 +38,38 @@ import (

var log = logf.Log.WithName("runner")

// Common interface for both HTTP and file-based event receivers
type eventReceiver interface {
Events() <-chan eventapi.JobEvent
Close()
}

// httpReceiverWrapper wraps the HTTP-based EventReceiver
type httpReceiverWrapper struct {
receiver *eventapi.EventReceiver
}

func (w *httpReceiverWrapper) Events() <-chan eventapi.JobEvent {
return w.receiver.Events
}

func (w *httpReceiverWrapper) Close() {
w.receiver.Close()
}

// fileReceiverWrapper wraps the file-based EventReceiver
type fileReceiverWrapper struct {
receiver *eventapi.FileEventReceiver
}

func (w *fileReceiverWrapper) Events() <-chan eventapi.JobEvent {
return w.receiver.Events
}

func (w *fileReceiverWrapper) Close() {
w.receiver.Close()
}

const (
// MaxRunnerArtifactsAnnotation - annotation used by a user to specify the max artifacts to keep
// in the runner directory. This will override the value provided by the watches file for a
Expand Down Expand Up @@ -199,10 +232,36 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri
// start the event receiver. We'll check errChan for an error after
// ansible-runner exits.
errChan := make(chan error, 1)
receiver, err := eventapi.New(ident, errChan)
if err != nil {
return nil, err

// Check if HTTP event API is disabled via environment variable
// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"

// TODO, making always true
useFileAPI := true

var receiver eventReceiver

if useFileAPI {
Comment on lines +236 to +244
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor ANSIBLE_RUNNER_USE_FILE_API and fix misleading comment.

Currently the env var is ignored and file API is always used. Respect the flag (default to file API if you want the POC default) and update the comment.

Apply this diff:

-	// Check if HTTP event API is disabled via environment variable
-	// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"
-
-	// TODO, making always true
-	useFileAPI := true
+	// Select event API via ANSIBLE_RUNNER_USE_FILE_API (true/1 => file API). Defaults to file API.
+	useFileAPI := true
+	if v, ok := os.LookupEnv("ANSIBLE_RUNNER_USE_FILE_API"); ok {
+		useFileAPI = v == "1" || strings.EqualFold(v, "true")
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check if HTTP event API is disabled via environment variable
// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"
// TODO, making always true
useFileAPI := true
var receiver eventReceiver
if useFileAPI {
// Select event API via ANSIBLE_RUNNER_USE_FILE_API (true/1 => file API). Defaults to file API.
useFileAPI := true
if v, ok := os.LookupEnv("ANSIBLE_RUNNER_USE_FILE_API"); ok {
useFileAPI = v == "1" || strings.EqualFold(v, "true")
}
var receiver eventReceiver
if useFileAPI {
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 236 to 244, the environment
flag ANSIBLE_RUNNER_USE_FILE_API is currently ignored and the comment is
misleading; restore honoring of the env var and update the comment. Read the env
var, normalize case, and set useFileAPI based on its value (treating empty as
true if you want the POC default), e.g. interpret "" or "true" as enabling the
file API and "false" as disabling it (remember to add a strings import if you
use strings.ToLower); then replace the hardcoded useFileAPI := true with that
logic and update the preceding comment to accurately describe that the flag
controls whether the file API is used and what the default behavior is.

// Use file-based event API
fmt.Println("Using file-base event API")
// File receiver should look in the inputDir where ansible-runner writes artifacts
artifactPath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
u.GetNamespace(), u.GetName())
fileReceiver, err := eventapi.NewFileEventReceiver(ident, artifactPath, errChan)
if err != nil {
return nil, err
}
receiver = &fileReceiverWrapper{fileReceiver}
} else {
// Use existing HTTP-based event API
fmt.Println("Using HTTP-based event API")
httpReceiver, err := eventapi.New(ident, errChan)
if err != nil {
Comment on lines +245 to +259
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix log typo and use structured logging instead of fmt.Println.

"file-base" → "file-based". Also prefer controller-runtime logger for consistency.

Apply this diff:

-		// Use file-based event API
-		fmt.Println("Using file-base event API")
-		// File receiver should look in the inputDir where ansible-runner writes artifacts
-		artifactPath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
-			u.GetNamespace(), u.GetName())
-		fileReceiver, err := eventapi.NewFileEventReceiver(ident, artifactPath, errChan)
+		// Use file-based event API
+		logger.Info("Using file-based event API")
+		// File receiver should look in the inputDir where ansible-runner writes artifacts
+		basePath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
+			u.GetNamespace(), u.GetName())
+		fileReceiver, err := eventapi.NewFileEventReceiver(ident, basePath, errChan)
 			if err != nil {
 				return nil, err
 			}
 			receiver = &fileReceiverWrapper{fileReceiver}
 		} else {
 			// Use existing HTTP-based event API
-			fmt.Println("Using HTTP-based event API")
+			logger.Info("Using HTTP-based event API")
 			httpReceiver, err := eventapi.New(ident, errChan)

Additionally, reuse basePath for inputDir.Path a few lines below:

inputDir := inputdir.InputDir{
    Path: basePath,
    // ...
}
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 245 to 259, replace the
fmt.Println calls and typo: change the "Using file-base event API" message to
"Using file-based event API" and use the controller-runtime structured logger
(e.g., log := ctrl.Log.WithName("ansible-runner") / appropriate logger in scope)
to emit these messages instead of fmt.Println; also when constructing the
inputDir a few lines below, set inputDir.Path to reuse the previously computed
basePath (Path: basePath) rather than duplicating the literal, keeping other
fields the same.

return nil, err
}
receiver = &httpReceiverWrapper{httpReceiver}
}

inputDir := inputdir.InputDir{
Path: filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
u.GetNamespace(), u.GetName()),
Expand All @@ -211,12 +270,20 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri
"K8S_AUTH_KUBECONFIG": kubeconfig,
"KUBECONFIG": kubeconfig,
},
Settings: map[string]string{
"runner_http_url": receiver.SocketPath,
"runner_http_path": receiver.URLPath,
},
CmdLine: r.ansibleArgs,
}

// Configure Settings based on the API type
if useFileAPI {
inputDir.Settings = map[string]string{}
} else {
if httpWrapper, ok := receiver.(*httpReceiverWrapper); ok {
inputDir.Settings = map[string]string{
"runner_http_url": httpWrapper.receiver.SocketPath,
"runner_http_path": httpWrapper.receiver.URLPath,
}
}
}
// If Path is a dir, assume it is a role path. Otherwise assume it's a
// playbook path
fi, err := os.Lstat(r.Path)
Expand Down Expand Up @@ -271,6 +338,35 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri
logger.Info("Ansible-runner exited successfully")
}

// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")

// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}

// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}

time.Sleep(5 * time.Second) // Increased delay for final file processing
}

Comment on lines +341 to +369
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace fixed sleep with a deterministic completion signal.

The 5s sleep is fragile and can either delay unnecessarily or still miss late writes. Prefer one of:

  • Close Events() after playbook_on_stats in FileEventReceiver and let callers range until close.
  • Or have FileEventReceiver expose a Done() <-chan struct{} fired after processing stats.

Interim mitigation (configurable delay) if you must keep a delay:

Apply this diff:

-			time.Sleep(5 * time.Second) // Increased delay for final file processing
+			if ms, err := strconv.Atoi(os.Getenv("RUNNER_FILEAPI_FLUSH_DELAY_MS")); err == nil && ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
+			}

Note: requires strconv import (already present).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")
// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}
// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}
time.Sleep(5 * time.Second) // Increased delay for final file processing
}
// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")
// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}
// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}
if ms, err := strconv.Atoi(os.Getenv("RUNNER_FILEAPI_FLUSH_DELAY_MS")); err == nil && ms > 0 {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 341-369, replace the fixed 5s
time.Sleep used to wait for the file-based event receiver with a deterministic
completion signal: wait for the FileEventReceiver to signal completion
(preferred) by ranging over its Events() until closed or by waiting on a Done()
<-chan struct{} that the fileReceiverWrapper fires after processing
playbook_on_stats; if neither exists, modify fileReceiverWrapper to close the
Events() channel or add and close a Done() channel when final stats are
processed, then block here on that signal instead of sleeping; as an interim
mitigation, make the wait configurable (e.g., a context-aware timeout or
configurable sleep duration) and use that value instead of a hardcoded 5s so you
can tune without code changes.

receiver.Close()
err = <-errChan
// http.Server returns this in the case of being closed cleanly
Expand All @@ -297,7 +393,7 @@ func (r *runner) Run(ident string, u *unstructured.Unstructured, kubeconfig stri
}()

return &runResult{
events: receiver.Events,
events: receiver.Events(),
inputDir: &inputDir,
ident: ident,
}, nil
Expand Down
Loading