Skip to content

Conversation

chiragkyal
Copy link
Member

@chiragkyal chiragkyal commented Jul 22, 2025

Description of the change:

This POC introduces a file-based event watcher as an alternative to the existing HTTP-based event approach (ansible-runner-http) for communicating with ansible-runner and operator.

Key Changes:

  • New File-based Event API (internal/ansible/runner/eventapi/fileapi.go):
  • Introduces a common wrapper eventReceiver interface for both HTTP and file-based implementations
  • It monitors the directory structure: {inputDir}/artifacts/{ident}/job_events/*.json using fsnotify, where ansible-runner writes event files that are processed and forwarded to the same event channel used by the HTTP-based approach.
  • Add environment variable configuration (ANSIBLE_RUNNER_USE_FILE_API)

Motivation for the change:

Remove the dependency on ansible-runner-http repository becuase it is archived in upstream.

TODO:

  • Reduce the latency to wait for all the events to be entirely written by ansible-runner.
  • MaxRunnerArtifactsAnnotation = "ansible.sdk.operatorframework.io/max-runner-artifacts" annotation might have some influence in the number of artifacts dir.

/hold

Summary by CodeRabbit

  • New Features
    • Introduced file-based event processing for Ansible runs, streaming job events directly from artifacts with improved delivery and graceful shutdown.
  • Chores
    • Promoted fsnotify to a direct dependency.
    • Temporarily disabled a vulnerability check step in the build requirements pipeline.
  • Tests
    • CI now captures controller manager logs after e2e scaffolding tests for better diagnostics.

@openshift-ci openshift-ci bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels Jul 22, 2025
Copy link

openshift-ci bot commented Jul 22, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link

openshift-ci bot commented Jul 22, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: chiragkyal

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-ci openshift-ci bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jul 22, 2025
@chiragkyal
Copy link
Member Author

/test all

3 similar comments
@chiragkyal
Copy link
Member Author

/test all

@chiragkyal
Copy link
Member Author

/test all

@chiragkyal
Copy link
Member Author

/test all

@chiragkyal chiragkyal force-pushed the poc-remove-http-plugin branch from d502eae to f255c14 Compare July 28, 2025 11:29
Signed-off-by: chiragkyal <[email protected]>
@chiragkyal
Copy link
Member Author

/test all

1 similar comment
@chiragkyal
Copy link
Member Author

/test all

Copy link

openshift-ci bot commented Sep 4, 2025

@chiragkyal: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/images 7513a78 link true /test images
ci/prow/verify-requirements 7513a78 link true /test verify-requirements
ci/prow/e2e-ansible 7513a78 link true /test e2e-ansible

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@chiragkyal
Copy link
Member Author

@coderabbitai review

Copy link

coderabbitai bot commented Sep 8, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

coderabbitai bot commented Sep 8, 2025

Walkthrough

Promotes fsnotify to a direct dependency. Adds a new file-based ansible-runner event receiver using fsnotify. Integrates the file-based receiver into runner flow with a new receiver interface and wrappers, defaulting to file API. Comments out a Pipenv vulnerability check in Dockerfile. Adds a log-dump step to an e2e test script.

Changes

Cohort / File(s) Summary
Dependencies
go.mod
Promoted github.com/fsnotify/fsnotify v1.8.0 from indirect to direct dependency; no other changes.
File-based event receiver
internal/ansible/runner/eventapi/fileapi.go
New FileEventReceiver watching artifacts/{ident}/job_events with fsnotify; reads JSON events, deduplicates, emits on buffered Events channel; supports context cancellation, error channel reporting, and Close().
Runner integration and control flow
internal/ansible/runner/runner.go
Added receiver interface and wrappers; defaulted to file-based API; constructs artifact path, wires receiver.Events() into RunResult; conditional HTTP settings; post-run artifact/job_events inspection and delay before receiver close; extra debug logging.
CI/Build scripts
openshift/Dockerfile.requirements, openshift/ci/tests/e2e-ansible-scaffolding.sh
Commented out pipenv check --ignore 71064; added unconditional kubectl logs dump for controller manager after tests.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Operator
  participant Runner as Ansible Runner
  participant Receiver as FileEventReceiver
  participant FS as fsnotify Watcher
  participant Artifacts as artifacts/.../job_events
  participant Events as Events Channel

  Operator->>Runner: Start playbook (file API default)
  Runner->>Receiver: NewFileEventReceiver(ident, artifactPath, errChan)
  Receiver->>FS: Watch job_events dir
  Note right of Receiver: Buffered Events channel (cap 1000)

  loop On FS events (create/write)
    FS-->>Receiver: File event (JSON)
    Receiver->>Artifacts: Read JSON (after brief delay)
    Receiver->>Receiver: Deduplicate processed files
    alt Send succeeds within timeout
      Receiver-->>Events: JobEvent
    else Timeout or stopped
      Receiver-->>Events: (no send)
    end
  end

  Runner->>Runner: Run ansible-runner
  Runner->>Artifacts: Post-run inspect dirs
  Runner->>Receiver: Close()
  Receiver->>FS: Stop watch
  Receiver-->>Runner: Goroutine exits
  Note over Runner,Events: HTTP path remains available via wrapper but not selected by default
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/ansible/runner/runner.go (1)

370-376: Potential goroutine leak: blocking receive on errChan.

File-based receiver may never send an error, causing this goroutine to block forever. Guard the receive with a timeout and handle closed channel.

Apply this diff:

-		receiver.Close()
-		err = <-errChan
-		// http.Server returns this in the case of being closed cleanly
-		if err != nil && err != http.ErrServerClosed {
-			logger.Error(err, "Error from event API")
-		}
+		receiver.Close()
+		select {
+		case err, ok := <-errChan:
+			// http.Server returns ErrServerClosed on clean close
+			if ok && err != nil && err != http.ErrServerClosed {
+				logger.Error(err, "Error from event API")
+			}
+		case <-time.After(2 * time.Second):
+			logger.V(1).Info("Timed out waiting for event API to close")
+		}
🧹 Nitpick comments (4)
internal/ansible/runner/eventapi/fileapi.go (2)

58-64: Avoid unbounded growth of processedFiles.

Dedup map grows with every event. Clear it when the terminal stats event is seen.

 		if event.Event == EventPlaybookOnStats {
 			r.logger.Info("Successfully processed playbook_on_stats event")
+			// Reset dedup map after terminal event
+			r.processMutex.Lock()
+			r.processedFiles = make(map[string]bool)
+			r.processMutex.Unlock()
 		}

Also applies to: 195-197


71-75: Consider tuning backpressure behavior.

Buffer of 1000 + 10s send timeout may still drop events under burst. Expose buffer via env/param and consider a bounded worker to drain to the consumer.

Example:

-		Events:         make(chan JobEvent, 1000),
+		Events:         make(chan JobEvent, getEnvInt("ANSIBLE_RUNNER_EVENTS_BUFFER", 1000)),

And replace the timeout send with a small bounded retry/backoff before drop.

Also applies to: 188-205

internal/ansible/runner/runner.go (2)

41-45: Document the interface contract (idempotent Close + channel lifecycle).

Please document whether Close() must be idempotent and whether it closes the Events() channel. This avoids ambiguity for callers and implementors of both wrappers.


276-286: Avoid setting empty Settings map; set only for HTTP path.

Minor clean-up; keeps InputDir semantics minimal.

Apply this diff:

-	// Configure Settings based on the API type
-	if useFileAPI {
-		inputDir.Settings = map[string]string{}
-	} else {
+	// Configure Settings only for HTTP API
+	if !useFileAPI {
 		if httpWrapper, ok := receiver.(*httpReceiverWrapper); ok {
 			inputDir.Settings = map[string]string{
 				"runner_http_url":  httpWrapper.receiver.SocketPath,
 				"runner_http_path": httpWrapper.receiver.URLPath,
 			}
 		}
 	}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Cache: Disabled due to data retention organization setting

Knowledge Base: Disabled due to Reviews > Disable Knowledge Base setting

📥 Commits

Reviewing files that changed from the base of the PR and between fd1eecd and 7513a78.

📒 Files selected for processing (5)
  • go.mod (1 hunks)
  • internal/ansible/runner/eventapi/fileapi.go (1 hunks)
  • internal/ansible/runner/runner.go (6 hunks)
  • openshift/Dockerfile.requirements (1 hunks)
  • openshift/ci/tests/e2e-ansible-scaffolding.sh (1 hunks)
🔇 Additional comments (4)
go.mod (1)

6-6: Approve fsnotify promotion to direct dependency. Verified direct imports in internal/ansible/runner/eventapi/fileapi.go and ran go mod tidy syncing go.sum.

internal/ansible/runner/runner.go (3)

47-59: HTTP receiver wrapper looks good.


60-72: File receiver wrapper looks good.


395-399: RunResult wiring LGTM.

Returning receiver.Events() through RunResult keeps callers agnostic to the underlying API.

Comment on lines +51 to +53
// errChan is a channel for errors
errChan chan<- error

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent goroutine hangs when sending errors (nil/blocked errChan).

Direct sends to a possibly nil or unbuffered errChan can deadlock the watcher. Guard nil and use a non-blocking send with logging fallback.

Apply:

-	// errChan is a channel for errors
-	errChan chan<- error
+	// errChan is a channel for errors (optional; non-blocking sends)
+	errChan chan<- error
-		f.errChan <- fmt.Errorf("failed to create job_events directory: %v", err)
+		r.trySendErr(fmt.Errorf("failed to create job_events directory: %v", err))
-		f.errChan <- fmt.Errorf("failed to create file watcher: %v", err)
+		f.trySendErr(fmt.Errorf("failed to create file watcher: %v", err))
-	if err := watcher.Add(jobEventsDir); err != nil {
-		f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err)
+	if err := watcher.Add(jobEventsDir); err != nil {
+		f.trySendErr(fmt.Errorf("failed to watch job_events directory: %v", err))
-			f.errChan <- fmt.Errorf("file watcher error: %v", err)
+			f.trySendErr(fmt.Errorf("file watcher error: %v", err))

Add helper (outside hunk; place below imports or as a method near Close):

func (r *FileEventReceiver) trySendErr(err error) {
	if r == nil || err == nil {
		return
	}
	if r.errChan == nil {
		r.logger.Error(err, "fileapi error")
		return
	}
	select {
	case r.errChan <- err:
	default:
		r.logger.Error(err, "fileapi error channel blocked; dropping")
	}
}

Also applies to: 99-101, 107-110, 113-116, 131-136

🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 51-53 (and also
update usages at 99-101, 107-110, 113-116, 131-136), direct sends to r.errChan
can deadlock if r.errChan is nil or unbuffered; add a helper method named
trySendErr on *FileEventReceiver (place below imports or near Close) that checks
for nil receiver or error, logs and returns if errChan is nil, and otherwise
performs a non-blocking send (select with default) that logs when the channel is
blocked; replace all direct sends to r.errChan in the indicated line ranges with
calls to r.trySendErr(err).

Comment on lines +113 to +118
if err := watcher.Add(jobEventsDir); err != nil {
f.errChan <- fmt.Errorf("failed to watch job_events directory: %v", err)
return
}

for {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Process pre-existing JSON files to avoid missing early events.

If ansible-runner wrote events before the watcher was added, they’re currently skipped.

 	if err := watcher.Add(jobEventsDir); err != nil {
 		f.trySendErr(fmt.Errorf("failed to watch job_events directory: %v", err))
 		return
 	}
 
+	// Process any pre-existing JSON files (ordered for stable replay)
+	if entries, err := os.ReadDir(jobEventsDir); err != nil {
+		f.trySendErr(fmt.Errorf("failed to list job_events: %v", err))
+	} else {
+		sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
+		for _, e := range entries {
+			if !e.IsDir() && filepath.Ext(e.Name()) == ".json" {
+				f.processEventFile(filepath.Join(jobEventsDir, e.Name()))
+			}
+		}
+	}
+
 	for {

Also add import:

 import (
 	"context"
 	"encoding/json"
 	"fmt"
 	"os"
 	"path/filepath"
+	"sort"
 	"sync"
 	"time"

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 113 to 118, before
calling watcher.Add(jobEventsDir) iterate the jobEventsDir for any pre-existing
JSON event files (e.g., via os.ReadDir + filepath.Ext == ".json"), read each
file (os.ReadFile), and feed its contents into the same processing path the
watcher uses (reuse the existing event handling function or send the parsed
event into the same channel) while logging and continuing on per-file errors;
then proceed to add the watcher as before. Also add the necessary imports: "os"
and "path/filepath".

Comment on lines +125 to +131
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle rename events to avoid missing files written via atomic rename.

Some writers do temp-write+rename; only listening for Create/Write can miss those.

-			// Process CREATE and WRITE events for JSON files
-			if (event.Op&fsnotify.Create == fsnotify.Create ||
-				event.Op&fsnotify.Write == fsnotify.Write) &&
+			// Process CREATE, WRITE, and RENAME events for JSON files
+			if (event.Op&fsnotify.Create == fsnotify.Create ||
+				event.Op&fsnotify.Write == fsnotify.Write ||
+				event.Op&fsnotify.Rename == fsnotify.Rename) &&
 				filepath.Ext(event.Name) == ".json" {
 				time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
 				f.processEventFile(event.Name)
 			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:
// Process CREATE, WRITE, and RENAME events for JSON files
if (event.Op&fsnotify.Create == fsnotify.Create ||
event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Rename == fsnotify.Rename) &&
filepath.Ext(event.Name) == ".json" {
time.Sleep(100 * time.Millisecond) // Brief delay to ensure file is fully written
f.processEventFile(event.Name)
}
case err, ok := <-watcher.Errors:

Comment on lines +154 to +171
// Add small delay to ensure file is fully written
time.Sleep(50 * time.Millisecond)

file, err := os.Open(filename)
if err != nil {
r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
return false
}
defer file.Close()

// Parse JSON event from file
var event JobEvent
decoder := json.NewDecoder(file)
if err := decoder.Decode(&event); err != nil {
// Skip files that aren't valid JSON (might be partial writes)
r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
return false
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace fixed sleeps with a short retry loop for file stability.

Transient open/parse failures from partial writes are common; retry briefly instead of dropping.

-	// Add small delay to ensure file is fully written
-	time.Sleep(50 * time.Millisecond)
-
-	file, err := os.Open(filename)
-	if err != nil {
-		r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
-		return false
-	}
-	defer file.Close()
-
-	// Parse JSON event from file
-	var event JobEvent
-	decoder := json.NewDecoder(file)
-	if err := decoder.Decode(&event); err != nil {
-		// Skip files that aren't valid JSON (might be partial writes)
-		r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
-		return false
-	}
+	// Retry open+decode for up to ~1s to allow writer to finish
+	var event JobEvent
+	deadline := time.Now().Add(1 * time.Second)
+	for {
+		file, err := os.Open(filename)
+		if err == nil {
+			decoder := json.NewDecoder(file)
+			if derr := decoder.Decode(&event); derr == nil {
+				file.Close()
+				break
+			} else {
+				r.logger.V(3).Info("JSON decode not ready", "file", filename, "error", derr)
+			}
+			file.Close()
+		} else {
+			r.logger.V(3).Info("File not ready", "file", filename, "error", err)
+		}
+		if time.Now().After(deadline) {
+			r.logger.V(2).Info("Giving up on event file", "file", filename)
+			return false
+		}
+		time.Sleep(50 * time.Millisecond)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Add small delay to ensure file is fully written
time.Sleep(50 * time.Millisecond)
file, err := os.Open(filename)
if err != nil {
r.logger.V(2).Info("Could not open event file (may not be ready)", "file", filename)
return false
}
defer file.Close()
// Parse JSON event from file
var event JobEvent
decoder := json.NewDecoder(file)
if err := decoder.Decode(&event); err != nil {
// Skip files that aren't valid JSON (might be partial writes)
r.logger.V(2).Info("Could not parse event file (may be incomplete)", "file", filename, "error", err)
return false
}
// Retry open+decode for up to ~1s to allow writer to finish
var event JobEvent
deadline := time.Now().Add(1 * time.Second)
for {
file, err := os.Open(filename)
if err == nil {
decoder := json.NewDecoder(file)
if derr := decoder.Decode(&event); derr == nil {
file.Close()
break
} else {
r.logger.V(3).Info("JSON decode not ready", "file", filename, "error", derr)
}
file.Close()
} else {
r.logger.V(3).Info("File not ready", "file", filename, "error", err)
}
if time.Now().After(deadline) {
r.logger.V(2).Info("Giving up on event file", "file", filename)
return false
}
time.Sleep(50 * time.Millisecond)
}
🤖 Prompt for AI Agents
In internal/ansible/runner/eventapi/fileapi.go around lines 154 to 171, replace
the fixed time.Sleep and single open/decode attempt with a short retry loop:
attempt to open the file and decode JSON up to N times (e.g. 3-5 attempts) with
a small backoff (e.g. 20-50ms) between attempts, on each attempt close the file
if opened, and only log and return false after all retries fail; on successful
decode return true as before. Ensure errors for non-JSON decode and open
failures are logged at the same verbosity as now and that resources are
deferred/closed properly inside the loop to avoid leaks.

Comment on lines +236 to +244
// Check if HTTP event API is disabled via environment variable
// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"

// TODO, making always true
useFileAPI := true

var receiver eventReceiver

if useFileAPI {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor ANSIBLE_RUNNER_USE_FILE_API and fix misleading comment.

Currently the env var is ignored and file API is always used. Respect the flag (default to file API if you want the POC default) and update the comment.

Apply this diff:

-	// Check if HTTP event API is disabled via environment variable
-	// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"
-
-	// TODO, making always true
-	useFileAPI := true
+	// Select event API via ANSIBLE_RUNNER_USE_FILE_API (true/1 => file API). Defaults to file API.
+	useFileAPI := true
+	if v, ok := os.LookupEnv("ANSIBLE_RUNNER_USE_FILE_API"); ok {
+		useFileAPI = v == "1" || strings.EqualFold(v, "true")
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check if HTTP event API is disabled via environment variable
// useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"
// TODO, making always true
useFileAPI := true
var receiver eventReceiver
if useFileAPI {
// Select event API via ANSIBLE_RUNNER_USE_FILE_API (true/1 => file API). Defaults to file API.
useFileAPI := true
if v, ok := os.LookupEnv("ANSIBLE_RUNNER_USE_FILE_API"); ok {
useFileAPI = v == "1" || strings.EqualFold(v, "true")
}
var receiver eventReceiver
if useFileAPI {
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 236 to 244, the environment
flag ANSIBLE_RUNNER_USE_FILE_API is currently ignored and the comment is
misleading; restore honoring of the env var and update the comment. Read the env
var, normalize case, and set useFileAPI based on its value (treating empty as
true if you want the POC default), e.g. interpret "" or "true" as enabling the
file API and "false" as disabling it (remember to add a strings import if you
use strings.ToLower); then replace the hardcoded useFileAPI := true with that
logic and update the preceding comment to accurately describe that the flag
controls whether the file API is used and what the default behavior is.

Comment on lines +245 to +259
// Use file-based event API
fmt.Println("Using file-base event API")
// File receiver should look in the inputDir where ansible-runner writes artifacts
artifactPath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
u.GetNamespace(), u.GetName())
fileReceiver, err := eventapi.NewFileEventReceiver(ident, artifactPath, errChan)
if err != nil {
return nil, err
}
receiver = &fileReceiverWrapper{fileReceiver}
} else {
// Use existing HTTP-based event API
fmt.Println("Using HTTP-based event API")
httpReceiver, err := eventapi.New(ident, errChan)
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix log typo and use structured logging instead of fmt.Println.

"file-base" → "file-based". Also prefer controller-runtime logger for consistency.

Apply this diff:

-		// Use file-based event API
-		fmt.Println("Using file-base event API")
-		// File receiver should look in the inputDir where ansible-runner writes artifacts
-		artifactPath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
-			u.GetNamespace(), u.GetName())
-		fileReceiver, err := eventapi.NewFileEventReceiver(ident, artifactPath, errChan)
+		// Use file-based event API
+		logger.Info("Using file-based event API")
+		// File receiver should look in the inputDir where ansible-runner writes artifacts
+		basePath := filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind,
+			u.GetNamespace(), u.GetName())
+		fileReceiver, err := eventapi.NewFileEventReceiver(ident, basePath, errChan)
 			if err != nil {
 				return nil, err
 			}
 			receiver = &fileReceiverWrapper{fileReceiver}
 		} else {
 			// Use existing HTTP-based event API
-			fmt.Println("Using HTTP-based event API")
+			logger.Info("Using HTTP-based event API")
 			httpReceiver, err := eventapi.New(ident, errChan)

Additionally, reuse basePath for inputDir.Path a few lines below:

inputDir := inputdir.InputDir{
    Path: basePath,
    // ...
}
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 245 to 259, replace the
fmt.Println calls and typo: change the "Using file-base event API" message to
"Using file-based event API" and use the controller-runtime structured logger
(e.g., log := ctrl.Log.WithName("ansible-runner") / appropriate logger in scope)
to emit these messages instead of fmt.Println; also when constructing the
inputDir a few lines below, set inputDir.Path to reuse the previously computed
basePath (Path: basePath) rather than duplicating the literal, keeping other
fields the same.

Comment on lines +341 to +369
// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")

// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}

// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}

time.Sleep(5 * time.Second) // Increased delay for final file processing
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace fixed sleep with a deterministic completion signal.

The 5s sleep is fragile and can either delay unnecessarily or still miss late writes. Prefer one of:

  • Close Events() after playbook_on_stats in FileEventReceiver and let callers range until close.
  • Or have FileEventReceiver expose a Done() <-chan struct{} fired after processing stats.

Interim mitigation (configurable delay) if you must keep a delay:

Apply this diff:

-			time.Sleep(5 * time.Second) // Increased delay for final file processing
+			if ms, err := strconv.Atoi(os.Getenv("RUNNER_FILEAPI_FLUSH_DELAY_MS")); err == nil && ms > 0 {
+				time.Sleep(time.Duration(ms) * time.Millisecond)
+			}

Note: requires strconv import (already present).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")
// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}
// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}
time.Sleep(5 * time.Second) // Increased delay for final file processing
}
// For file-based API, give some time to process final event files
if _, isFileReceiver := receiver.(*fileReceiverWrapper); isFileReceiver {
logger.V(1).Info("Waiting for file-based event receiver to process final events")
// Check if artifacts directory exists and list contents
artifactsDir := filepath.Join(inputDir.Path, "artifacts", ident)
if entries, err := os.ReadDir(artifactsDir); err == nil {
logger.V(1).Info("Artifacts directory contents", "dir", artifactsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Artifact entry", "name", entry.Name(), "isDir", entry.IsDir())
}
} else {
logger.Info("Could not read artifacts directory", "dir", artifactsDir, "error", err)
}
// Check job_events directory specifically
jobEventsDir := filepath.Join(inputDir.Path, "artifacts", ident, "job_events")
if entries, err := os.ReadDir(jobEventsDir); err == nil {
logger.V(1).Info("Job events directory contents", "dir", jobEventsDir, "entries", len(entries))
for _, entry := range entries {
logger.V(1).Info("Job event file", "name", entry.Name())
}
} else {
logger.Info("Could not read job_events directory", "dir", jobEventsDir, "error", err)
}
if ms, err := strconv.Atoi(os.Getenv("RUNNER_FILEAPI_FLUSH_DELAY_MS")); err == nil && ms > 0 {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
🤖 Prompt for AI Agents
In internal/ansible/runner/runner.go around lines 341-369, replace the fixed 5s
time.Sleep used to wait for the file-based event receiver with a deterministic
completion signal: wait for the FileEventReceiver to signal completion
(preferred) by ranging over its Events() until closed or by waiting on a Done()
<-chan struct{} that the fileReceiverWrapper fires after processing
playbook_on_stats; if neither exists, modify fileReceiverWrapper to close the
Events() channel or add and close a Done() channel when final stats are
processed, then block here on that signal instead of sleeping; as an interim
mitigation, make the wait configurable (e.g., a context-aware timeout or
configurable sleep duration) and use that value instead of a hardcoded 5s so you
can tune without code changes.

Comment on lines +173 to +174
kubectl logs deployment/memcached-molecule-operator-controller-manager -c manager

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make the post-test log dump non-fatal and namespace-explicit.

With set -e, this unconditional logs call can abort cleanup if the deployment disappears/flaps. Also, be explicit about namespace to avoid context drift.

-kubectl logs deployment/memcached-molecule-operator-controller-manager -c manager
+kubectl logs -n memcached-molecule-operator-system deployment/memcached-molecule-operator-controller-manager -c manager || true
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kubectl logs deployment/memcached-molecule-operator-controller-manager -c manager
kubectl logs -n memcached-molecule-operator-system deployment/memcached-molecule-operator-controller-manager -c manager || true
🤖 Prompt for AI Agents
In openshift/ci/tests/e2e-ansible-scaffolding.sh around lines 173-174, the
unconditional kubectl logs call is currently fatal under set -e and omits an
explicit namespace; change it to a non-fatal, namespace-explicit call by
invoking kubectl logs with the test namespace (use the existing NAMESPACE or
TEST_NAMESPACE variable, or default to "default") and ensure failures are
ignored (e.g., run the logs command in a way that won’t cause the script to
exit: append a no-op on failure or temporarily disable errexit around the call
and restore it afterwards).

# but the upgraded version doesn't support the use case (protocol we are using).\
# Ref: https://github.com/operator-framework/ansible-operator-plugins/pull/67#issuecomment-2189164688
&& pipenv check --ignore 71064 \
# && pipenv check --ignore 71064 \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Don’t silently drop vulnerability checks; gate them via a build arg.

Skipping the check entirely hides regressions. Make it opt-in/out so CI can enable it while local builds can skip.

Apply:

-  # && pipenv check --ignore 71064 \
+  && if [ "${RUN_VULN_CHECK:-0}" = "1" ]; then pipenv check --ignore 71064; fi \

Additionally, add the ARG once near the top (outside this hunk):

ARG RUN_VULN_CHECK=0
🤖 Prompt for AI Agents
In openshift/Dockerfile.requirements around line 27, the pipenv vulnerability
check was commented out; instead add a build ARG near the top like ARG
RUN_VULN_CHECK=0, then gate the pipenv check command with that ARG so it only
runs when RUN_VULN_CHECK=1 (e.g. wrap or conditionally append "&& if [
\"$RUN_VULN_CHECK\" = \"1\" ]; then pipenv check --ignore 71064; fi" to the RUN
line), ensuring CI can enable the check via --build-arg while local builds
default to skipping it.

Copy link
Member Author

@chiragkyal chiragkyal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[By Claude] Sleep Commands Issue: The hard-coded sleep statements at lines 127, 154, and 338 are unreliable and should be replaced with proper file completion detection. Consider using file size stability checks or JSON validation approaches instead of arbitrary delays.

@chiragkyal
Copy link
Member Author

[By Claude] Sleep Commands Review

I've identified several hard-coded sleep statements that should be replaced with more reliable file completion detection:

Line 127 in fileapi.go:
Line 154 in fileapi.go:
Line 367 in runner.go:

Recommended solution: Replace with file size stability checking:

func (f *FileEventReceiver) waitForFileComplete(filename string, maxWait time.Duration) bool {
    const checkInterval = 10 * time.Millisecond
    const stableChecks = 3
    
    start := time.Now()
    var lastSize int64 = -1
    stableCount := 0
    
    for time.Since(start) < maxWait {
        stat, err := os.Stat(filename)
        if err != nil {
            time.Sleep(checkInterval)
            continue
        }
        
        currentSize := stat.Size()
        if currentSize == lastSize && currentSize > 0 {
            stableCount++
            if stableCount >= stableChecks {
                return true
            }
        } else {
            stableCount = 0
        }
        
        lastSize = currentSize
        time.Sleep(checkInterval)
    }
    
    return false
}

@chiragkyal
Copy link
Member Author

[By Claude] Debugging Code Issues

Line 240 in runner.go: Remove the TODO comment and implement proper environment variable handling:

useFileAPI := os.Getenv("ANSIBLE_RUNNER_USE_FILE_API") == "true"

Lines 246 & 257 in runner.go: Replace fmt.Println with proper logging:

log.Info("Using file-based event API")
log.Info("Using HTTP-based event API") 

This maintains consistency with the rest of the codebase logging patterns.

@chiragkyal
Copy link
Member Author

[By Claude] Memory Leak & Resource Management Issues

Line 59 in fileapi.go: The processedFiles map[string]bool grows indefinitely and is never cleaned up. This will cause a memory leak over time.

Recommended solutions:

  1. Cleanup on Close(): Add cleanup in the Close method:
func (r *FileEventReceiver) Close() {
    r.mutex.Lock()
    r.stopped = true
    r.mutex.Unlock()
    
    // Cancel context and wait for goroutines
    if r.cancelFunc != nil {
        r.cancelFunc()
    }
    r.wg.Wait()
    
    // Cleanup processed files map
    r.processMutex.Lock()
    r.processedFiles = make(map[string]bool)
    r.processMutex.Unlock()
    
    r.logger.V(1).Info("File Event API stopped")
}
  1. Or use a bounded LRU cache: Consider replacing the map with a proper LRU cache with size limits.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant