diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditAsyncQueueTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditAsyncQueueTest.java new file mode 100644 index 0000000000..894fbe1267 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditAsyncQueueTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditAsyncQueue + * */ +class AuditAsyncQueueTest { + private AuditHandler mockHandler; + private AuditAsyncQueue queue; + + @BeforeEach + void setUp() { + mockHandler = mock(AuditHandler.class); + queue = new AuditAsyncQueue(mockHandler); + } + + @Test + void testLogSingleEvent() { + AuditEventBase event = mock(AuditEventBase.class); + assertTrue(queue.log(event)); + assertEquals(1, queue.queue.size()); + } + + @Test + void testLogMultipleEvents() { + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + assertTrue(queue.log(Arrays.asList(event1, event2))); + assertEquals(2, queue.queue.size()); + } + + @Test + void testLogReturnsFalseWhenQueueFull() { + queue = spy(queue); + doReturn(0).when(queue).getMaxQueueSize(); + AuditEventBase event = mock(AuditEventBase.class); + assertFalse(queue.log(event)); + } + + @Test + void testStartAndStop() throws InterruptedException { + queue.start(); + assertNotNull(queue.consumerThread); + assertTrue(queue.consumerThread.isAlive()); + + queue.stop(); + // Give some time for thread to stop + TimeUnit.MILLISECONDS.sleep(100); + assertNull(queue.consumerThread); + } + + @Test + void testStopCallsConsumerStop() throws InterruptedException { + queue.start(); + queue.stop(); + TimeUnit.MILLISECONDS.sleep(100); + verify(mockHandler, atLeastOnce()).stop(); + } + + @Test + void testRunLogAuditDrainModeExitsWhenQueueEmpty() throws Exception { + // Set drain mode + queue.setDrain(true); + + // Queue is already empty in the setUp method + + // Should exit the loop without exceptions + assertDoesNotThrow(() -> queue.runLogAudit()); + + // Consumer.stop should be called when exiting the loop + verify(mockHandler).stop(); + } + + @Test + void testRunLogAuditDrainModeProcessesRemainingEvents() throws Exception { + // Add an event to the queue + AuditEventBase event = mock(AuditEventBase.class); + queue.queue.add(event); + + // Set drain mode + queue.setDrain(true); + + // Process the queue in drain mode + assertDoesNotThrow(() -> queue.runLogAudit()); + + // Event should be processed + verify(mockHandler).log(anyCollection()); + + // Consumer.stop should be called when exiting the loop + verify(mockHandler).stop(); + } + + @Test + void testRunLogAuditExitsWhenDrainMaxTimeElapsed() throws Exception { + // Create a spy to control isDrainMaxTimeElapsed behavior + AuditAsyncQueue queueSpy = spy(queue); + + // Add an event to prevent immediate exit due to empty queue + AuditEventBase event = mock(AuditEventBase.class); + queueSpy.queue.add(event); + + // Set drain mode + queueSpy.setDrain(true); + + // Make isDrainMaxTimeElapsed return true after first event + when(queueSpy.isDrainMaxTimeElapsed()) + .thenReturn(false) // First check after processing the event + .thenReturn(true); // Second check to break the loop + + // Should exit the loop after processing one event + assertDoesNotThrow(() -> queueSpy.runLogAudit()); + + // Event should be processed + verify(mockHandler).log(anyCollection()); + } + + @Test + void testConsumerStopThrowsException() throws Exception { + // Setup consumer to throw exception when stop is called + doThrow(new RuntimeException("Stop error")).when(mockHandler).stop(); + + // Set drain mode to trigger exit + queue.setDrain(true); + + // Should catch the exception from consumer.stop + assertDoesNotThrow(() -> queue.runLogAudit()); + + // Verify stop was called + verify(mockHandler).stop(); + } + + @Test + void testStartWithNullConsumer() { + // Create queue with null consumer + AuditAsyncQueue queueWithNullConsumer = new AuditAsyncQueue(null); + + // Should not throw exception, but log error + assertDoesNotThrow(() -> queueWithNullConsumer.start()); + + // Thread should still be created + assertNotNull(queueWithNullConsumer.consumerThread); + assertTrue(queueWithNullConsumer.consumerThread.isAlive()); + + // Clean up + queueWithNullConsumer.stop(); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditBatchQueueTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditBatchQueueTest.java new file mode 100644 index 0000000000..4761f88992 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditBatchQueueTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditBatchQueue + * */ +class AuditBatchQueueTest { + private AuditHandler mockHandler; + private AuditBatchQueue queue; + + @BeforeEach + void setUp() { + mockHandler = mock(AuditHandler.class); + queue = new AuditBatchQueue(mockHandler); + queue.setMaxQueueSize(10); + queue.setMaxBatchSize(5); + queue.setMaxBatchInterval(100); + } + + @Test + void testLogSingleEvent() { + queue.start(); + AuditEventBase event = mock(AuditEventBase.class); + assertTrue(queue.log(event)); + } + + @Test + void testLogMultipleEvents() { + queue.start(); + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + assertTrue(queue.log(Arrays.asList(event1, event2))); + } + + @Test + void testStartAndStop() throws InterruptedException { + queue.start(); + assertNotNull(queue.consumerThread); + assertTrue(queue.consumerThread.isAlive()); + + queue.stop(); + TimeUnit.MILLISECONDS.sleep(100); + assertNull(queue.consumerThread); + } + + @Test + void testStopCallsConsumerStop() throws InterruptedException { + queue.start(); + queue.stop(); + TimeUnit.MILLISECONDS.sleep(100); + verify(mockHandler, atLeastOnce()).stop(); + } + + @Test + void testDoubleStartDoesNotCreateNewThread() { + queue.start(); + Thread firstThread = queue.consumerThread; + queue.start(); // Should not create a new thread + assertEquals(firstThread, queue.consumerThread); + } + + @Test + void testLogAfterStopDoesNotThrow() { + queue.start(); + queue.stop(); + AuditEventBase event = mock(AuditEventBase.class); + assertDoesNotThrow(() -> queue.log(event)); + } + + @Test + void testFlushCallsConsumerFlush() { + queue.start(); + queue.flush(); + verify(mockHandler, atLeastOnce()).flush(); + } + + @Test + void testWaitToCompleteCallsConsumerWaitToComplete() { + queue.start(); + queue.waitToComplete(); + verify(mockHandler, atLeastOnce()).waitToComplete(anyLong()); + } + + @Test + void testLogEmptyCollectionReturnsTrue() { + queue.start(); + assertTrue(queue.log(Collections.emptyList())); + } + + @Test + void testWaitToCompleteWithTimeout() { + queue.start(); + queue.waitToComplete(100); + verify(mockHandler).waitToComplete(100); + } + + @Test + void testFlush() { + queue.start(); + queue.flush(); + verify(mockHandler).flush(); + } + + @Test + void testRunLogAuditWithFileSpooling() throws Exception { + // Setup fileSpooler + AuditFileSpool mockSpooler = mock(AuditFileSpool.class); + when(mockSpooler.isPending()).thenReturn(true); + when(mockSpooler.getLastAttemptTimeDelta()).thenReturn(6000L); // Greater than default maxWaitTime + + // Set fileSpooler using reflection + Field fileSpoolerField = AuditQueue.class.getDeclaredField("fileSpooler"); + fileSpoolerField.setAccessible(true); + fileSpoolerField.set(queue, mockSpooler); + + queue.fileSpoolerEnabled = true; + queue.fileSpoolMaxWaitTime = 5000; + queue.fileSpoolDrainThresholdPercent = 80; + + queue.start(); + + // Add event to trigger processing + AuditEventBase mockEvent = mock(AuditEventBase.class); + queue.log(mockEvent); + + // Wait for processing + TimeUnit.MILLISECONDS.sleep(200); + + // Verify event was spooled due to fileSpoolDrain condition + verify(mockSpooler, atLeastOnce()).stashLogs(anyCollection()); + + queue.stop(); + } + + @Test + void testRunLogAuditWithDestinationFailure() throws Exception { + // Setup consumer to return failure + when(mockHandler.log(anyCollection())).thenReturn(false); + + // Setup fileSpooler + AuditFileSpool mockSpooler = mock(AuditFileSpool.class); + + // Set fileSpooler using reflection + Field fileSpoolerField = AuditQueue.class.getDeclaredField("fileSpooler"); + fileSpoolerField.setAccessible(true); + fileSpoolerField.set(queue, mockSpooler); + + queue.fileSpoolerEnabled = true; + + queue.start(); + + // Add event to trigger processing + AuditEventBase mockEvent = mock(AuditEventBase.class); + queue.log(mockEvent); + + // Wait for processing + TimeUnit.MILLISECONDS.sleep(200); + + // Verify event was stashed due to consumer failure + verify(mockSpooler, atLeastOnce()).stashLogs(anyCollection()); + + queue.stop(); + } + + @Test + void testWaitToCompleteStaticLoop() throws Exception { + queue.start(); + Field queueField = AuditBatchQueue.class.getDeclaredField("queue"); + queueField.setAccessible(true); + BlockingQueue blockingQueue = mock(BlockingQueue.class); + when(blockingQueue.isEmpty()).thenReturn(false); + when(blockingQueue.size()).thenReturn(1); + queueField.set(queue, blockingQueue); + + Field localBatchBufferField = AuditBatchQueue.class.getDeclaredField("localBatchBuffer"); + localBatchBufferField.setAccessible(true); + localBatchBufferField.set(queue, new ArrayList<>()); + + // This will cause staticLoopCount to exceed 5 and break + queue.waitToComplete(10); + verify(mockHandler, atLeastOnce()).waitToComplete(anyLong()); + } + + @Test + void testRunHandlesThrowable() throws Exception { + AuditBatchQueue testQueue = spy(new AuditBatchQueue(mockHandler)); + doThrow(new RuntimeException("test")).when(testQueue).runLogAudit(); + testQueue.run(); // Should log error, not throw + } + + @Test + void testRunLogAuditDrainModeExits() throws Exception { + queue.start(); + queue.setDrain(true); + // Wait for thread to exit + TimeUnit.MILLISECONDS.sleep(200); + assertTrue(queue.isDrain()); + queue.stop(); + } + + @Test + void testFlushWithFileSpooler() throws Exception { + AuditFileSpool mockSpooler = mock(AuditFileSpool.class); + Field fileSpoolerField = AuditQueue.class.getDeclaredField("fileSpooler"); + fileSpoolerField.setAccessible(true); + fileSpoolerField.set(queue, mockSpooler); + queue.fileSpoolerEnabled = true; + queue.flush(); + verify(mockSpooler).flush(); + verify(mockHandler).flush(); + } + + @Test + void testLogInterruptedException() throws Exception { + queue.start(); + Field queueField = AuditBatchQueue.class.getDeclaredField("queue"); + queueField.setAccessible(true); + BlockingQueue blockingQueue = mock(BlockingQueue.class); + doThrow(new InterruptedException()).when(blockingQueue).put(any()); + queueField.set(queue, blockingQueue); + AuditEventBase event = mock(AuditEventBase.class); + assertThrows(RuntimeException.class, () -> queue.log(event)); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpoolTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpoolTest.java new file mode 100644 index 0000000000..aefd075667 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileCacheProviderSpoolTest.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditFileCacheProviderSpool + * */ +class AuditFileCacheProviderSpoolTest { + private AuditHandler mockHandler; + private AuditFileCacheProviderSpool spool; + private File tempDir; + private Properties props; + + @BeforeEach + void setUp() throws Exception { + mockHandler = mock(AuditHandler.class); + tempDir = Files.createTempDirectory("auditspooltest").toFile(); + tempDir.deleteOnExit(); + + props = new Properties(); + props.setProperty("xasecure.audit.filespool.filespool.dir", tempDir.getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.archive.dir", new File(tempDir, "archive").getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.index.filename", "index_test.json"); + props.setProperty("xasecure.audit.provider.filecache.is.enabled", "true"); + + spool = new AuditFileCacheProviderSpool(mockHandler); + } + + @AfterEach + void tearDown() { + if (spool != null) { + spool.stop(); + } + if (tempDir != null && tempDir.exists()) { + File[] files = tempDir.listFiles(); + if (files != null) { + for (File f : files) { + f.delete(); + } + } + tempDir.delete(); + } + } + + @Test + void testInitCreatesFilesAndFolders() { + boolean result = spool.init(props, "xasecure.audit.filespool"); + assertTrue(result); + assertTrue(spool.logFolder.exists()); + assertTrue(spool.archiveFolder.exists()); + assertTrue(spool.indexFile.exists()); + assertTrue(spool.indexDoneFile.exists()); + } + + @Test + void testStartAndStopThread() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.start(); + assertNotNull(spool.destinationThread); + assertTrue(spool.destinationThread.isAlive()); + spool.stop(); + assertNull(spool.destinationThread); + } + + @Test + void testStashSingleEvent() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event = mock(AuditEventBase.class); + spool.stashLogs(event); + assertTrue(spool.isPending()); + assertTrue(spool.isSpoolingSuccessful()); + } + + @Test + void testStashMultipleEvents() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + List events = Arrays.asList(event1, event2); + spool.stashLogs(events); + assertTrue(spool.isPending()); + } + + @Test + void testFlushDoesNotThrow() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertDoesNotThrow(() -> spool.flush()); + } + + @Test + void testIsPendingBeforeAndAfterInit() { + assertFalse(spool.isPending()); + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertFalse(spool.isPending()); + } + + @Test + void testDoubleInitReturnsTrue() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertTrue(spool.init(props, "xasecure.audit.filespool")); // Should log error and return true + } + + @Test + void testStashLogsStringCollection() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + List events = Arrays.asList("event1", "event2", "event3"); + assertDoesNotThrow(() -> spool.stashLogsString(events)); + } + + @Test + void testStashLogsStringAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + spool.stashLogsString("event after stop"); + assertTrue(true); // Should not throw + } + + @Test + void testStashLogsAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + AuditEventBase event = mock(AuditEventBase.class); + spool.stashLogs(event); // Should log error but not throw + assertTrue(true); + } + + @Test + void testStartWithoutInit() { + AuditFileCacheProviderSpool uninitSpool = new AuditFileCacheProviderSpool(mockHandler); + uninitSpool.start(); + assertNull(uninitSpool.destinationThread); + } + + @Test + void testFlushWithoutInit() { + AuditFileCacheProviderSpool uninitSpool = new AuditFileCacheProviderSpool(mockHandler); + assertDoesNotThrow(uninitSpool::flush); + } + + @Test + void testGetLastAttemptTimeDelta() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertEquals(0, spool.getLastAttemptTimeDelta()); + } + + @Test + void testRollOverSpoolFileByTime() throws Exception { + // Set up a very short rollover time + props.setProperty("xasecure.audit.filespool.filespool.file.rollover.sec", "1"); // 1 second for quick testing + spool.init(props, "xasecure.audit.filespool"); + + // Get access to private field + Field fileRolloverSecField = AuditFileCacheProviderSpool.class.getDeclaredField("fileRolloverSec"); + fileRolloverSecField.setAccessible(true); + fileRolloverSecField.set(spool, 1); // 1 second for quick testing + + // Create initial log file + AuditEventBase mockEvent = mock(AuditEventBase.class); + spool.stashLogs(mockEvent); + + // Access the current writer record using reflection + Field writerRecordField = AuditFileCacheProviderSpool.class.getDeclaredField("currentWriterIndexRecord"); + writerRecordField.setAccessible(true); + AuditFileCacheProviderSpool.AuditIndexRecord firstRecord = + (AuditFileCacheProviderSpool.AuditIndexRecord) writerRecordField.get(spool); + String firstFilePath = firstRecord.filePath; + + // Sleep to trigger rollover time + Thread.sleep(1500); + + // Write another event which should trigger rollover + spool.stashLogs(mockEvent); + + // Get the new writer record + AuditFileCacheProviderSpool.AuditIndexRecord secondRecord = + (AuditFileCacheProviderSpool.AuditIndexRecord) writerRecordField.get(spool); + + // Assert file has changed + assertNotEquals(firstFilePath, secondRecord.filePath); + + // Verify first file was added to the index queue + Field indexQueueField = AuditFileCacheProviderSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = + (BlockingQueue) indexQueueField.get(spool); + + boolean fileInQueue = false; + for (AuditFileCacheProviderSpool.AuditIndexRecord record : indexQueue) { + if (record.filePath.equals(firstFilePath)) { + fileInQueue = true; + break; + } + } + assertTrue(fileInQueue, "First file should be in the index queue after rollover"); + } + + @Test + void testCloseFileIfNeeded() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create log file by writing an event + AuditEventBase mockEvent = mock(AuditEventBase.class); + spool.stashLogs(mockEvent); + + // Set closeFile flag to true + Field closeFileField = AuditFileCacheProviderSpool.class.getDeclaredField("closeFile"); + closeFileField.setAccessible(true); + closeFileField.set(spool, true); + + // Get access to private closeFileIfNeeded method + Method closeFileIfNeededMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod("closeFileIfNeeded"); + closeFileIfNeededMethod.setAccessible(true); + + // Get current writer record before closing + Field writerRecordField = AuditFileCacheProviderSpool.class.getDeclaredField("currentWriterIndexRecord"); + writerRecordField.setAccessible(true); + AuditFileCacheProviderSpool.AuditIndexRecord recordBeforeClose = + (AuditFileCacheProviderSpool.AuditIndexRecord) writerRecordField.get(spool); + + // Invoke the method + closeFileIfNeededMethod.invoke(spool); + + // Verify the writer is now null + Field logWriterField = AuditFileCacheProviderSpool.class.getDeclaredField("logWriter"); + logWriterField.setAccessible(true); + assertNull(logWriterField.get(spool)); + + // Verify currentWriterIndexRecord is null + assertNull(writerRecordField.get(spool)); + + // Verify record was added to indexQueue + Field indexQueueField = AuditFileCacheProviderSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = + (BlockingQueue) indexQueueField.get(spool); + + boolean found = false; + for (AuditFileCacheProviderSpool.AuditIndexRecord record : indexQueue) { + if (record.filePath.equals(recordBeforeClose.filePath)) { + found = true; + assertEquals(AuditFileCacheProviderSpool.SPOOL_FILE_STATUS.pending, record.status); + break; + } + } + assertTrue(found, "Closed file should be added to the index queue with pending status"); + } + + @Test + void testSendEvent() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create mock events + List mockEvents = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + mockEvents.add(mock(AuditEventBase.class)); + } + + // Create index record + AuditFileCacheProviderSpool.AuditIndexRecord indexRecord = new AuditFileCacheProviderSpool.AuditIndexRecord(); + indexRecord.id = UUID.randomUUID().toString(); + indexRecord.filePath = "test_path"; + + // Mock the handler to succeed + when(mockHandler.log(anyCollection())).thenReturn(true); + + // Get access to private sendEvent method + Method sendEventMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod( + "sendEvent", List.class, AuditFileCacheProviderSpool.AuditIndexRecord.class, int.class); + sendEventMethod.setAccessible(true); + + // Invoke the method + boolean result = (Boolean) sendEventMethod.invoke(spool, mockEvents, indexRecord, 3); + + // Verify result + assertTrue(result); + verify(mockHandler).log(mockEvents); + + // Verify the lastSuccessTime was set + assertNotNull(indexRecord.lastSuccessTime); + } + + @Test + void testAppendToDoneFile() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create a test file + File testFile = new File(tempDir, "test_file.log"); + testFile.createNewFile(); + + // Create index record + AuditFileCacheProviderSpool.AuditIndexRecord indexRecord = new AuditFileCacheProviderSpool.AuditIndexRecord(); + indexRecord.id = UUID.randomUUID().toString(); + indexRecord.filePath = testFile.getAbsolutePath(); + indexRecord.status = AuditFileCacheProviderSpool.SPOOL_FILE_STATUS.done; + indexRecord.writeCompleteTime = new Date(); + + // Get access to appendToDoneFile method + Method appendToDoneFileMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod( + "appendToDoneFile", AuditFileCacheProviderSpool.AuditIndexRecord.class); + appendToDoneFileMethod.setAccessible(true); + + // Invoke the method + appendToDoneFileMethod.invoke(spool, indexRecord); + + // Verify the done file was created and contains data + assertTrue(spool.indexDoneFile.exists()); + + // Read the file content + BufferedReader reader = new BufferedReader(new FileReader(spool.indexDoneFile)); + StringBuilder content = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + content.append(line); + } + reader.close(); + + // Verify the content contains the record ID + assertTrue(content.toString().contains(indexRecord.id)); + + // Verify flush was called + verify(mockHandler, times(2)).flush(); + + // Verify file was archived (moved to archive folder) + File archiveFile = new File(spool.archiveFolder, testFile.getName()); + assertTrue(archiveFile.exists() || !testFile.exists(), + "Either the file should be moved to archive or deleted"); + } + + @Test + void testSaveIndexFile() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Add some records to indexRecords + Field indexRecordsField = AuditFileCacheProviderSpool.class.getDeclaredField("indexRecords"); + indexRecordsField.setAccessible(true); + List indexRecords = + (List) indexRecordsField.get(spool); + + // Create and add sample records + AuditFileCacheProviderSpool.AuditIndexRecord record1 = new AuditFileCacheProviderSpool.AuditIndexRecord(); + record1.id = "test-id-1"; + record1.filePath = "/test/path1"; + record1.status = AuditFileCacheProviderSpool.SPOOL_FILE_STATUS.pending; + + AuditFileCacheProviderSpool.AuditIndexRecord record2 = new AuditFileCacheProviderSpool.AuditIndexRecord(); + record2.id = "test-id-2"; + record2.filePath = "/test/path2"; + record2.status = AuditFileCacheProviderSpool.SPOOL_FILE_STATUS.write_inprogress; + + indexRecords.add(record1); + indexRecords.add(record2); + + // Call saveIndexFile + Method saveIndexFileMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod("saveIndexFile"); + saveIndexFileMethod.setAccessible(true); + saveIndexFileMethod.invoke(spool); + + // Verify the index file was created and contains expected content + assertTrue(spool.indexFile.exists()); + + // Read the file content + BufferedReader reader = new BufferedReader(new FileReader(spool.indexFile)); + List fileLines = new ArrayList<>(); + String line; + while ((line = reader.readLine()) != null) { + fileLines.add(line); + } + reader.close(); + + // Verify the content includes both records + assertEquals(2, fileLines.size()); + assertTrue(fileLines.get(0).contains("test-id-1")); + assertTrue(fileLines.get(1).contains("test-id-2")); + } + + @Test + void testRemoveIndexRecord() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Add some records to indexRecords + Field indexRecordsField = AuditFileCacheProviderSpool.class.getDeclaredField("indexRecords"); + indexRecordsField.setAccessible(true); + List indexRecords = + (List) indexRecordsField.get(spool); + + // Create and add sample records + AuditFileCacheProviderSpool.AuditIndexRecord record1 = new AuditFileCacheProviderSpool.AuditIndexRecord(); + record1.id = "test-id-1"; + record1.filePath = "/test/path1"; + + AuditFileCacheProviderSpool.AuditIndexRecord record2 = new AuditFileCacheProviderSpool.AuditIndexRecord(); + record2.id = "test-id-2"; + record2.filePath = "/test/path2"; + + indexRecords.add(record1); + indexRecords.add(record2); + + // Call removeIndexRecord + Method removeIndexRecordMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod( + "removeIndexRecord", AuditFileCacheProviderSpool.AuditIndexRecord.class); + removeIndexRecordMethod.setAccessible(true); + removeIndexRecordMethod.invoke(spool, record1); + + // Verify record1 was removed + assertEquals(1, indexRecords.size()); + assertEquals("test-id-2", indexRecords.get(0).id); + + // Remove the last record + removeIndexRecordMethod.invoke(spool, record2); + + // Verify all records removed + assertTrue(indexRecords.isEmpty()); + + // Verify isDestDown was reset when all records are removed + Field isDestDownField = AuditFileCacheProviderSpool.class.getDeclaredField("isDestDown"); + isDestDownField.setAccessible(true); + assertFalse((Boolean) isDestDownField.get(spool)); + } + + @Test + void testLogError() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Get access to logError and lastErrorLogMS field + Method logErrorMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod("logError", String.class); + logErrorMethod.setAccessible(true); + + Field lastErrorLogMSField = AuditFileCacheProviderSpool.class.getDeclaredField("lastErrorLogMS"); + lastErrorLogMSField.setAccessible(true); + + // Set lastErrorLogMS to a value in the past + lastErrorLogMSField.set(spool, System.currentTimeMillis() - 60000); // 1 minute ago + + // Call logError + logErrorMethod.invoke(spool, "Test error message"); + + // Verify lastErrorLogMS was updated + long currentLastErrorLogMS = (Long) lastErrorLogMSField.get(spool); + assertTrue(System.currentTimeMillis() - currentLastErrorLogMS < 1000); // Should be recent + + // Call logError again immediately (should not log) + logErrorMethod.invoke(spool, "Another test error"); + + // lastErrorLogMS should not have changed + assertEquals(currentLastErrorLogMS, (Long) lastErrorLogMSField.get(spool)); + } + + @Test + void testGetLogFileStream() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Get access to getLogFileStream method + Method getLogFileStreamMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod("getLogFileStream"); + getLogFileStreamMethod.setAccessible(true); + + // Access the logWriter field + Field logWriterField = AuditFileCacheProviderSpool.class.getDeclaredField("logWriter"); + logWriterField.setAccessible(true); + + // First call - should create new file + PrintWriter writer1 = (PrintWriter) getLogFileStreamMethod.invoke(spool); + assertNotNull(writer1); + + // Writer should be stored in logWriter field + assertEquals(writer1, logWriterField.get(spool)); + + // Second call - should return same writer + PrintWriter writer2 = (PrintWriter) getLogFileStreamMethod.invoke(spool); + assertEquals(writer1, writer2); + + // Verify currentWriterIndexRecord was created + Field currentWriterIndexRecordField = AuditFileCacheProviderSpool.class.getDeclaredField("currentWriterIndexRecord"); + currentWriterIndexRecordField.setAccessible(true); + assertNotNull(currentWriterIndexRecordField.get(spool)); + } + + @Test + void testPrintIndex() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Add some records to indexRecords + Field indexRecordsField = AuditFileCacheProviderSpool.class.getDeclaredField("indexRecords"); + indexRecordsField.setAccessible(true); + List indexRecords = + (List) indexRecordsField.get(spool); + + // Create and add sample records + AuditFileCacheProviderSpool.AuditIndexRecord record1 = new AuditFileCacheProviderSpool.AuditIndexRecord(); + record1.id = "test-id-1"; + record1.filePath = "/test/path1"; + + indexRecords.add(record1); + + // Get access to printIndex method + Method printIndexMethod = AuditFileCacheProviderSpool.class.getDeclaredMethod("printIndex"); + printIndexMethod.setAccessible(true); + + // Call printIndex - just verify it doesn't throw an exception + assertDoesNotThrow(() -> printIndexMethod.invoke(spool)); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueSpoolTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueSpoolTest.java new file mode 100644 index 0000000000..fed353c4d7 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueSpoolTest.java @@ -0,0 +1,561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuditIndexRecord; +import org.apache.ranger.audit.model.SPOOL_FILE_STATUS; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditFileQueueSpool + * */ +class AuditFileQueueSpoolTest { + private AuditHandler mockHandler; + private AuditFileQueueSpool spool; + private File tempDir; + private Properties props; + + @BeforeEach + void setUp() throws Exception { + mockHandler = mock(AuditHandler.class); + tempDir = Files.createTempDirectory("auditspooltest").toFile(); + tempDir.deleteOnExit(); + + props = new Properties(); + props.setProperty("xasecure.audit.filespool.filespool.dir", tempDir.getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.archive.dir", new File(tempDir, "archive").getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.index.filename", "index_test.json"); + + spool = new AuditFileQueueSpool(mockHandler); + } + + @AfterEach + void tearDown() { + if (spool != null) { + spool.stop(); + } + if (tempDir != null && tempDir.exists()) { + File[] files = tempDir.listFiles(); + if (files != null) { + for (File f : files) { + f.delete(); + } + } + tempDir.delete(); + } + } + + @Test + void testInitCreatesFilesAndFolders() { + boolean result = spool.init(props, "xasecure.audit.filespool"); + assertTrue(result); + assertTrue(spool.logFolder.exists()); + assertTrue(spool.archiveFolder.exists()); + assertTrue(spool.indexFile.exists()); + assertTrue(spool.indexDoneFile.exists()); + } + + @Test + void testStartAndStopThread() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.start(); + assertNotNull(spool.destinationThread); + assertTrue(spool.destinationThread.isAlive()); + spool.stop(); + assertNull(spool.destinationThread); + } + + @Test + void testStashSingleEvent() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event = mock(AuditEventBase.class); + spool.stashLogs(event); + assertTrue(spool.isPending()); + assertTrue(spool.isSpoolingSuccessful()); + } + + @Test + void testStashMultipleEvents() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + List events = Arrays.asList(event1, event2); + spool.stashLogs(events); + assertTrue(spool.isPending()); + } + + @Test + void testFlushDoesNotThrow() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertDoesNotThrow(() -> spool.flush()); + } + + @Test + void testIsPendingBeforeAndAfterInit() { + assertFalse(spool.isPending()); + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertFalse(spool.isPending()); + } + + @Test + void testInitFailsWithMissingLogFolder() { + Properties badProps = new Properties(); + badProps.setProperty("xasecure.audit.filespool.filespool.dir", ""); + boolean result = spool.init(badProps, "xasecure.audit.filespool"); + assertFalse(result); + } + + @Test + void testInitFailsWithUncreatableLogFolder() { + Properties badProps = new Properties(); + // Use an invalid path + badProps.setProperty("xasecure.audit.filespool.filespool.dir", "/dev/null/invalid"); + boolean result = spool.init(badProps, "xasecure.audit.filespool"); + assertFalse(result); + } + + @Test + void testStopWithoutInit() { + AuditFileQueueSpool uninitSpool = new AuditFileQueueSpool(mockHandler); + assertDoesNotThrow(uninitSpool::stop); + } + + @Test + void testFlushWithoutInit() { + AuditFileQueueSpool uninitSpool = new AuditFileQueueSpool(mockHandler); + assertDoesNotThrow(uninitSpool::flush); + } + + @Test + void testIsSpoolingSuccessfulFlag() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertTrue(spool.isSpoolingSuccessful()); + } + + @Test + void testGetLastAttemptTimeDelta() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + long delta = spool.getLastAttemptTimeDelta(); + assertEquals(0, delta); + } + + @Test + void testStashLogsAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + AuditEventBase event = mock(AuditEventBase.class); + spool.stashLogs(event); // Should log error but not throw + assertTrue(true); // If no exception, test passes + } + + @Test + void testStashLogsStringAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + spool.stashLogsString("event after stop"); + assertTrue(true); + } + + @Test + void testDoubleInitReturnsTrue() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + // Second init should log error and return true + assertTrue(spool.init(props, "xasecure.audit.filespool")); + } + + @Test + void testStartWithoutInit() { + AuditFileQueueSpool uninitSpool = new AuditFileQueueSpool(mockHandler); + // Should log error and not start thread + uninitSpool.start(); + assertNull(uninitSpool.destinationThread); + } + + @Test + void testInitWithInvalidDirectories() throws Exception { + // Setup with invalid directory that can't be created + Properties props = new Properties(); + props.setProperty("xasecure.audit.filespool.filespool.dir", "/proc/invalid-dir"); + + mockHandler = mock(AuditHandler.class); + when(mockHandler.getName()).thenReturn("TestConsumer"); + + spool = new AuditFileQueueSpool(mockHandler); + spool.init(props); + + // Check if initDone is false + java.lang.reflect.Field initDoneField = AuditFileQueueSpool.class.getDeclaredField("initDone"); + initDoneField.setAccessible(true); + boolean initDone = (boolean) initDoneField.get(spool); + assertFalse(initDone, "Init should fail with invalid directory"); + + // Same test with invalid archive directory + props = createBaseProperties(); + props.setProperty("xasecure.audit.filespool.filespool.archive.dir", "/proc/invalid-archive-dir"); + + spool = new AuditFileQueueSpool(mockHandler); + spool.init(props); + + initDone = (boolean) initDoneField.get(spool); + assertFalse(initDone, "Init should fail with invalid archive directory"); + } + + @Test + void testRollOverSpoolFileByTime() throws Exception { + // Set up a very short rollover time + props.setProperty("xasecure.audit.filespool.filespool.file.rollover.sec", "1"); // 1 second for quick testing + spool.init(props); + + // Create initial log file + AuditEventBase mockEvent = mock(AuditEventBase.class); + spool.stashLogs(mockEvent); + + // Access the current writer record using reflection + java.lang.reflect.Field writerRecordField = AuditFileQueueSpool.class.getDeclaredField("currentWriterIndexRecord"); + writerRecordField.setAccessible(true); + AuditIndexRecord firstRecord = (AuditIndexRecord) writerRecordField.get(spool); + String firstFilePath = firstRecord.getFilePath(); + + // Sleep to trigger rollover time + Thread.sleep(1500); + + // Write another event which should trigger rollover + spool.stashLogs(mockEvent); + + // Get the new writer record + AuditIndexRecord secondRecord = (AuditIndexRecord) writerRecordField.get(spool); + + // Assert file has changed + assertNotEquals(firstFilePath, secondRecord.getFilePath()); + + // Check if the first file is in the queue + java.lang.reflect.Field indexQueueField = AuditFileQueueSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = (BlockingQueue) indexQueueField.get(spool); + + boolean fileInQueue = false; + for (AuditIndexRecord record : indexQueue) { + if (record.getFilePath().equals(firstFilePath)) { + fileInQueue = true; + break; + } + } + assertTrue(fileInQueue, "First file should be in the index queue after rollover"); + } + + @Test + void testLogEvent() throws Exception { + spool.init(props); + + // Create a file with audit events + File testFile = new File(tempDir, "test_log_event.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_1\"}"); + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_2\"}"); + } + + // Create a BufferedReader for the file + BufferedReader reader = new BufferedReader(new java.io.FileReader(testFile)); + + // Get access to the private logEvent method + java.lang.reflect.Method logEventMethod = AuditFileQueueSpool.class.getDeclaredMethod( + "logEvent", BufferedReader.class); + logEventMethod.setAccessible(true); + + // Prepare mock handler to return true + when(mockHandler.log(anyCollection())).thenReturn(true); + + // Prepare current consumer index record + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(testFile.getAbsolutePath()); + record.setLinePosition(0); + + java.lang.reflect.Field consumerRecordField = AuditFileQueueSpool.class.getDeclaredField("currentConsumerIndexRecord"); + consumerRecordField.setAccessible(true); + consumerRecordField.set(spool, record); + + // Invoke the method + logEventMethod.invoke(spool, reader); + + // Verify handler was called + verify(mockHandler, atLeastOnce()).log(anyCollection()); + + reader.close(); + } + + @Test + void testSendEvent() throws Exception { + spool.init(props); + + // Create mock events + List mockEvents = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + mockEvents.add(mock(AuditEventBase.class)); + } + + // Create index record + AuditIndexRecord indexRecord = new AuditIndexRecord(); + indexRecord.setId(UUID.randomUUID().toString()); + indexRecord.setFilePath("test_path"); + + // Mock the handler to succeed + when(mockHandler.log(anyCollection())).thenReturn(true); + + // Get access to private sendEvent method + java.lang.reflect.Method sendEventMethod = AuditFileQueueSpool.class.getDeclaredMethod( + "sendEvent", List.class, AuditIndexRecord.class, int.class); + sendEventMethod.setAccessible(true); + + // Invoke the method + boolean result = (Boolean) sendEventMethod.invoke(spool, mockEvents, indexRecord, 3); + + // Verify result + assertTrue(result); + verify(mockHandler).log(mockEvents); + } + + @Test + void testLogFile() throws Exception { + spool.init(props); + + // Create a file with audit events + File testFile = new File(tempDir, "test_log_file.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_1\"}"); + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_2\"}"); + } + + // Set up mock handler + when(mockHandler.logFile(any(File.class))).thenReturn(true); + + // Prepare current consumer index record + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(testFile.getAbsolutePath()); + + java.lang.reflect.Field consumerRecordField = AuditFileQueueSpool.class.getDeclaredField("currentConsumerIndexRecord"); + consumerRecordField.setAccessible(true); + consumerRecordField.set(spool, record); + + // Get access to private logFile method + java.lang.reflect.Method logFileMethod = AuditFileQueueSpool.class.getDeclaredMethod( + "logFile", File.class); + logFileMethod.setAccessible(true); + + // Invoke the method + logFileMethod.invoke(spool, testFile); + + // Verify handler was called + verify(mockHandler).logFile(testFile); + } + + @Test + void testSendFile() throws Exception { + spool.init(props); + + // Create a file with audit events + File testFile = new File(tempDir, "test_send_file.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_1\"}"); + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_2\"}"); + } + + // Create index record + AuditIndexRecord indexRecord = new AuditIndexRecord(); + indexRecord.setId(UUID.randomUUID().toString()); + indexRecord.setFilePath(testFile.getAbsolutePath()); + + // Mock the handler to succeed + when(mockHandler.logFile(any(File.class))).thenReturn(true); + + // Get access to private sendFile method + java.lang.reflect.Method sendFileMethod = AuditFileQueueSpool.class.getDeclaredMethod( + "sendFile", File.class, AuditIndexRecord.class, int.class); + sendFileMethod.setAccessible(true); + + // Invoke the method + boolean result = (Boolean) sendFileMethod.invoke(spool, testFile, indexRecord, 0); + + // Verify result + assertTrue(result); + verify(mockHandler).logFile(testFile); + } + + @Test + void testAppendToDoneFile() throws Exception { + spool.init(props); + + // Create index record + AuditIndexRecord indexRecord = new AuditIndexRecord(); + indexRecord.setId(UUID.randomUUID().toString()); + indexRecord.setFilePath(new File(tempDir, "test_file.log").getAbsolutePath()); + indexRecord.setStatus(SPOOL_FILE_STATUS.done); + // Use setWriteCompleteTime instead of setCreatedTime which doesn't exist + indexRecord.setWriteCompleteTime(new Date()); + + // Mock the handler flush method + doNothing().when(mockHandler).flush(); + + // Get access to appendToDoneFile method + java.lang.reflect.Method appendToDoneFileMethod = AuditFileQueueSpool.class.getDeclaredMethod( + "appendToDoneFile", AuditIndexRecord.class); + appendToDoneFileMethod.setAccessible(true); + + // Invoke the method + appendToDoneFileMethod.invoke(spool, indexRecord); + + // Verify the done file was created and contains data + File doneFile = spool.indexDoneFile; + assertTrue(doneFile.exists()); + + // Verify the file contains the record ID - use BufferedReader instead of Files.readString + BufferedReader reader = new BufferedReader(new java.io.FileReader(doneFile)); + StringBuilder content = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + content.append(line); + } + reader.close(); + + assertTrue(content.toString().contains(indexRecord.getId())); + + // Verify flush was called + verify(mockHandler, times(2)).flush(); + } + + @Test + void testRunLogAudit() throws Exception { + // Set up quick retry time + props.setProperty("xasecure.audit.filespool.filespool.destination.retry.ms", "100"); // 100ms for quick testing + spool.init(props); + + // Create a spy of the AuditFileQueueSpool to control behavior + AuditFileQueueSpool spoolSpy = spy(spool); + + // Create a test file + File testFile = new File(tempDir, "test_audit_events.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_1\"}"); + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_2\"}"); + } + + // Create an index record for this file and add it to the queue + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(testFile.getAbsolutePath()); + record.setStatus(SPOOL_FILE_STATUS.pending); + + // Add to the queue using reflection + java.lang.reflect.Field indexQueueField = AuditFileQueueSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = (BlockingQueue) indexQueueField.get(spoolSpy); + indexQueue.add(record); + + // Set the mock handler to accept events + when(mockHandler.log(anyCollection())).thenReturn(true); + when(mockHandler.logFile(any(File.class))).thenReturn(true); + + // Create a thread to run the runLogAudit method for a short time + AtomicBoolean testFinished = new AtomicBoolean(false); + Thread testThread = new Thread(() -> { + try { + // Call the runLogAudit method using reflection + java.lang.reflect.Method runLogAuditMethod = AuditFileQueueSpool.class.getDeclaredMethod("runLogAudit"); + runLogAuditMethod.setAccessible(true); + + // Set isDrain to true after a delay to stop the loop + new Thread(() -> { + try { + Thread.sleep(500); + java.lang.reflect.Field isDrainField = AuditFileQueueSpool.class.getDeclaredField("isDrain"); + isDrainField.setAccessible(true); + isDrainField.set(spoolSpy, true); + testFinished.set(true); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + // Run the method + runLogAuditMethod.invoke(spoolSpy); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // Start the thread + testThread.start(); + + // Wait for the test to finish + while (!testFinished.get()) { + Thread.sleep(100); + } + + // Wait for the thread to terminate + testThread.join(1000); + + // Verify interaction with the handler (either log or logFile should be called) + verify(mockHandler, atLeastOnce()).getName(); + } + + // Add helper method to create base properties + private Properties createBaseProperties() { + Properties props = new Properties(); + props.setProperty("xasecure.audit.filespool.filespool.dir", new File(tempDir, "logs").getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.archive.dir", new File(tempDir, "archive").getAbsolutePath()); + return props; + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueTest.java new file mode 100644 index 0000000000..cb8a0db6e6 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileQueueTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditFileQueue + * */ +class AuditFileQueueTest { + private AuditHandler mockConsumer; + private AuditFileQueue queue; + private AuditFileQueueSpool mockSpooler; + private Properties props; + + @BeforeEach + void setUp() { + mockConsumer = mock(AuditHandler.class); + queue = new AuditFileQueue(mockConsumer); + mockSpooler = mock(AuditFileQueueSpool.class); + props = new Properties(); + // Use reflection to inject the mock spooler after init + } + + @Test + void testInitSetsUpSpooler() throws Exception { + queue = spy(queue); + java.lang.reflect.Field field = AuditFileQueue.class.getDeclaredField("fileSpooler"); + field.setAccessible(true); + field.set(queue, mockSpooler); + queue.init(props, "xasecure.audit.batch"); + assertNotNull(queue.fileSpooler); + } + + @Test + void testLogSingleEventDelegatesToSpooler() { + AuditEventBase event = mock(AuditEventBase.class); + queue.fileSpooler = mockSpooler; + when(mockSpooler.isSpoolingSuccessful()).thenReturn(true); + + boolean result = queue.log(event); + + verify(mockSpooler).stashLogs(event); + assertTrue(result); + } + + @Test + void testLogMultipleEventsDelegatesToSpooler() { + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + queue.fileSpooler = mockSpooler; + when(mockSpooler.isSpoolingSuccessful()).thenReturn(true); + + List events = Arrays.asList(event1, event2); + boolean result = queue.log(events); + + verify(mockSpooler, times(2)).stashLogs(any(AuditEventBase.class)); + assertTrue(result); + } + + @Test + void testStartStartsConsumerAndSpooler() { + queue.fileSpooler = mockSpooler; + queue.start(); + verify(mockConsumer).start(); + verify(mockSpooler).start(); + } + + @Test + void testStopStopsConsumer() { + queue.stop(); + verify(mockConsumer).stop(); + } + + @Test + void testWaitToCompleteDelegatesToConsumer() { + queue.waitToComplete(); + verify(mockConsumer).waitToComplete(); + } + + @Test + void testWaitToCompleteWithTimeoutDelegatesToConsumer() { + queue.waitToComplete(100L); + verify(mockConsumer).waitToComplete(100L); + } + + @Test + void testFlushDelegatesToConsumer() { + queue.flush(); + verify(mockConsumer).flush(); + } + + @Test + void testLogWithNullEventReturnsFalse() { + // Test null event handling + boolean result = queue.log((AuditEventBase) null); + assertFalse(result); + } + + @Test + void testLogWithNullCollectionReturnsTrue() { + // Test null collection handling + boolean result = queue.log((Collection) null); + assertTrue(result); + } + + @Test + void testLogWhenSpoolingFails() { + // Setup + AuditEventBase event = mock(AuditEventBase.class); + queue.fileSpooler = mockSpooler; + when(mockSpooler.isSpoolingSuccessful()).thenReturn(false); + + // Execute + boolean result = queue.log(event); + + // Verify + verify(mockSpooler).stashLogs(event); + assertFalse(result); + } + + @Test + void testStartWithNullConsumer() { + // Create a queue with null consumer + AuditFileQueue queueWithNullConsumer = new AuditFileQueue(null); + queueWithNullConsumer.fileSpooler = mockSpooler; + + // Should not throw exception when consumer is null + assertDoesNotThrow(() -> queueWithNullConsumer.start()); + + // Spooler should still be started + verify(mockSpooler).start(); + } + + @Test + void testStartWithNullSpooler() { + // Create a queue with null spooler + queue.fileSpooler = null; + + // Should not throw exception when spooler is null + assertDoesNotThrow(() -> queue.start()); + + // Consumer should still be started + verify(mockConsumer).start(); + } + + @Test + void testStopWithNullConsumer() { + // Create a queue with null consumer + AuditFileQueue queueWithNullConsumer = new AuditFileQueue(null); + + // Should not throw exception when consumer is null + assertDoesNotThrow(() -> queueWithNullConsumer.stop()); + } + + @Test + void testWaitToCompleteWithNullConsumer() { + // Create a queue with null consumer + AuditFileQueue queueWithNullConsumer = new AuditFileQueue(null); + + // Should not throw exception when consumer is null + assertDoesNotThrow(() -> queueWithNullConsumer.waitToComplete()); + } + + @Test + void testWaitToCompleteWithTimeoutAndNullConsumer() { + // Create a queue with null consumer + AuditFileQueue queueWithNullConsumer = new AuditFileQueue(null); + + // Should not throw exception when consumer is null + assertDoesNotThrow(() -> queueWithNullConsumer.waitToComplete(100L)); + } + + @Test + void testFlushWithNullConsumer() { + // Create a queue with null consumer + AuditFileQueue queueWithNullConsumer = new AuditFileQueue(null); + + // Should not throw exception when consumer is null + assertDoesNotThrow(() -> queueWithNullConsumer.flush()); + } + + @Test + void testLogMultipleEventsWithSomeFailing() { + // Setup + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + queue.fileSpooler = mockSpooler; + + // First event succeeds, second fails + when(mockSpooler.isSpoolingSuccessful()) + .thenReturn(true) + .thenReturn(false); + + List events = Arrays.asList(event1, event2); + boolean result = queue.log(events); + + // Last result should be returned (false) + assertFalse(result); + verify(mockSpooler, times(2)).stashLogs(any(AuditEventBase.class)); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileSpoolTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileSpoolTest.java new file mode 100644 index 0000000000..c7af69e363 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditFileSpoolTest.java @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuditIndexRecord; +import org.apache.ranger.audit.model.SPOOL_FILE_STATUS; +import org.apache.ranger.audit.provider.AuditHandler; +import org.apache.ranger.audit.provider.MiscUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditFileSpool + * */ +class AuditFileSpoolTest { + private AuditQueue mockQueue; + private AuditHandler mockHandler; + private AuditFileSpool spool; + private File tempDir; + private Properties props; + + @BeforeEach + void setUp() throws Exception { + mockQueue = mock(AuditQueue.class); + when(mockQueue.getName()).thenReturn("testQueue"); + when(mockQueue.getMaxBatchSize()).thenReturn(10); + + mockHandler = mock(AuditHandler.class); + when(mockHandler.getName()).thenReturn("testHandler"); + + tempDir = Files.createTempDirectory("auditspooltest").toFile(); + tempDir.deleteOnExit(); + + props = new Properties(); + props.setProperty("xasecure.audit.filespool.filespool.dir", tempDir.getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.archive.dir", new File(tempDir, "archive").getAbsolutePath()); + props.setProperty("xasecure.audit.filespool.filespool.index.filename", "index_test.json"); + + spool = new AuditFileSpool(mockQueue, mockHandler); + } + + @AfterEach + void tearDown() { + if (spool != null) { + spool.stop(); + } + if (tempDir != null && tempDir.exists()) { + File[] files = tempDir.listFiles(); + if (files != null) { + for (File f : files) { + f.delete(); + } + } + tempDir.delete(); + } + } + + @Test + void testInitCreatesFilesAndFolders() { + boolean result = spool.init(props, "xasecure.audit.filespool"); + assertTrue(result); + assertTrue(spool.logFolder.exists()); + assertTrue(spool.archiveFolder.exists()); + assertTrue(spool.indexFile.exists()); + assertTrue(spool.indexDoneFile.exists()); + } + + @Test + void testStartAndStopThread() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.start(); + assertNotNull(spool.destinationThread); + assertTrue(spool.destinationThread.isAlive()); + spool.stop(); + assertNull(spool.destinationThread); + } + + @Test + void testStashSingleEvent() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event = mock(AuditEventBase.class); + spool.stashLogs(event); + assertTrue(spool.isPending()); + } + + @Test + void testStashMultipleEvents() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + List events = Arrays.asList(event1, event2); + spool.stashLogs(events); + assertTrue(spool.isPending()); + } + + @Test + void testFlushDoesNotThrow() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertDoesNotThrow(() -> spool.flush()); + } + + @Test + void testIsPendingBeforeAndAfterInit() { + assertFalse(spool.isPending()); + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertFalse(spool.isPending()); + } + + @Test + void testDoubleInitReturnsTrue() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertTrue(spool.init(props, "xasecure.audit.filespool")); // Should log error and return true + } + + @Test + void testStashLogsStringCollection() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + List events = Arrays.asList("event1", "event2", "event3"); + assertDoesNotThrow(() -> spool.stashLogsString(events)); + } + + @Test + void testStashLogsStringAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + assertDoesNotThrow(() -> spool.stashLogsString("event after stop")); + } + + @Test + void testStashLogsAfterStop() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + spool.stop(); + AuditEventBase event = mock(AuditEventBase.class); + assertDoesNotThrow(() -> spool.stashLogs(event)); + } + + @Test + void testStartWithoutInit() { + AuditFileSpool uninitSpool = new AuditFileSpool(mockQueue, mockHandler); + uninitSpool.start(); + assertNull(uninitSpool.destinationThread); + } + + @Test + void testFlushWithoutInit() { + AuditFileSpool uninitSpool = new AuditFileSpool(mockQueue, mockHandler); + assertDoesNotThrow(uninitSpool::flush); + } + + @Test + void testIsPendingWithPendingIndexRecord() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + // Simulate a pending record by setting isPending manually + spool.isPending = true; + assertTrue(spool.isPending()); + } + + @Test + void testGetLastAttemptTimeDelta() { + assertTrue(spool.init(props, "xasecure.audit.filespool")); + assertEquals(0, spool.getLastAttemptTimeDelta()); + spool.lastAttemptTime = System.currentTimeMillis() - 1000; + assertTrue(spool.getLastAttemptTimeDelta() >= 1000); + } + + @Test + void testRollOverSpoolFileByTime() throws Exception { + // Set up a very short rollover time + props.setProperty("xasecure.audit.filespool.filespool.file.rollover.sec", "1"); // 1 second for quick testing + spool.init(props, "xasecure.audit.filespool"); + + // Access fileRolloverSec using reflection to ensure it's set properly + java.lang.reflect.Field fileRolloverSecField = AuditFileSpool.class.getDeclaredField("fileRolloverSec"); + fileRolloverSecField.setAccessible(true); + fileRolloverSecField.set(spool, 1); // 1 second for quick testing + + // Create initial log file + AuditEventBase mockEvent = mock(AuditEventBase.class); + spool.stashLogs(mockEvent); + + // Access the current writer record using reflection + java.lang.reflect.Field writerRecordField = AuditFileSpool.class.getDeclaredField("currentWriterIndexRecord"); + writerRecordField.setAccessible(true); + AuditIndexRecord firstRecord = (AuditIndexRecord) writerRecordField.get(spool); + String firstFilePath = firstRecord.getFilePath(); + + // Sleep to trigger rollover time + Thread.sleep(1500); + + // Write another event which should trigger rollover + spool.stashLogs(mockEvent); + + // Get the new writer record + AuditIndexRecord secondRecord = (AuditIndexRecord) writerRecordField.get(spool); + + // Assert file has changed + assertNotEquals(firstFilePath, secondRecord.getFilePath()); + + // Check if the first file is in the queue + java.lang.reflect.Field indexQueueField = AuditFileSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = (BlockingQueue) indexQueueField.get(spool); + + boolean fileInQueue = false; + for (AuditIndexRecord record : indexQueue) { + if (record.getFilePath().equals(firstFilePath)) { + fileInQueue = true; + break; + } + } + assertTrue(fileInQueue, "First file should be in the index queue after rollover"); + } + + @Test + void testRunLogAudit() throws Exception { + // Set up quick retry time + props.setProperty("xasecure.audit.filespool.filespool.destination.retry.ms", "100"); // 100ms for quick testing + spool.init(props, "xasecure.audit.filespool"); + + // Create a test file + File testFile = new File(tempDir, "test_audit_events.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_1\"}"); + writer.println("{\"type\":\"ranger_audit\",\"id\":\"test_id_2\"}"); + } + + // Create an index record for this file and add it to the queue + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(testFile.getAbsolutePath()); + record.setStatus(SPOOL_FILE_STATUS.pending); + + // Add to the queue using reflection + java.lang.reflect.Field indexQueueField = AuditFileSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = (BlockingQueue) indexQueueField.get(spool); + indexQueue.add(record); + + // Set the mock handler to accept events + when(mockHandler.log(anyCollection())).thenReturn(true); + when(mockHandler.logFile(any(File.class))).thenReturn(true); + + // Get access to runLogAudit method + java.lang.reflect.Method runLogAuditMethod = AuditFileSpool.class.getDeclaredMethod("runLogAudit"); + runLogAuditMethod.setAccessible(true); + + // Create a flag for stopping the test + java.util.concurrent.atomic.AtomicBoolean stopTest = new java.util.concurrent.atomic.AtomicBoolean(false); + + // Create a thread to run runLogAudit for a limited time + Thread testThread = new Thread(() -> { + try { + // Set isDrain to true after a delay to stop processing + new Thread(() -> { + try { + Thread.sleep(1000); + java.lang.reflect.Field isDrainField = AuditFileSpool.class.getDeclaredField("isDrain"); + isDrainField.setAccessible(true); + isDrainField.set(spool, true); + stopTest.set(true); + } catch (Exception e) { + e.printStackTrace(); + } + }).start(); + + // Run the audit processing method + runLogAuditMethod.invoke(spool); + } catch (Exception e) { + // Ignore exceptions during test shutdown + } + }); + + // Start the thread and wait for completion + testThread.start(); + while (!stopTest.get()) { + Thread.sleep(100); + } + testThread.join(2000); + + // Verify handler interactions + verify(mockHandler, atLeastOnce()).getName(); + } + + @Test + void testLoadIndexFile() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create a test index file with some records + try (PrintWriter writer = new PrintWriter(spool.indexFile)) { + writer.println("{\"id\":\"test-id-1\",\"filePath\":\"/test/path1\",\"status\":\"pending\"}"); + writer.println("{\"id\":\"test-id-2\",\"filePath\":\"/test/path2\",\"status\":\"write_inprogress\"}"); + } + + // Clear existing records + java.lang.reflect.Field indexRecordsField = AuditFileSpool.class.getDeclaredField("indexRecords"); + indexRecordsField.setAccessible(true); + List indexRecords = (List) indexRecordsField.get(spool); + indexRecords.clear(); + + // Call loadIndexFile + java.lang.reflect.Method loadIndexFileMethod = AuditFileSpool.class.getDeclaredMethod("loadIndexFile"); + loadIndexFileMethod.setAccessible(true); + loadIndexFileMethod.invoke(spool); + + // Verify records were loaded + assertTrue(indexRecords.size() >= 2); + boolean foundId1 = false; + boolean foundId2 = false; + + for (AuditIndexRecord record : indexRecords) { + if ("test-id-1".equals(record.getId())) { + foundId1 = true; } + if ("test-id-2".equals(record.getId())) { + foundId2 = true; } + } + + assertTrue(foundId1); + assertTrue(foundId2); + } + + @Test + void testSendFileAndEvents() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create test data + File testFile = new File(tempDir, "test_log_event.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{ \"event\": \"test event 1\" }"); + writer.println("{ \"event\": \"test event 2\" }"); + } + + // Setup mock behavior + when(mockHandler.logFile(any(File.class))).thenReturn(true); + when(mockHandler.logJSON(anyCollection())).thenReturn(true); + + // Create a test index record + AuditIndexRecord indexRecord = new AuditIndexRecord(); + indexRecord.setId(UUID.randomUUID().toString()); + indexRecord.setFilePath(testFile.getAbsolutePath()); + indexRecord.setStatus(SPOOL_FILE_STATUS.pending); + + // Access sendEvent method by its actual name + java.lang.reflect.Method sendEventMethod = AuditFileSpool.class.getDeclaredMethod( + "sendEvent", List.class, AuditIndexRecord.class, int.class); + sendEventMethod.setAccessible(true); + + // Test the method with some lines + List lines = new ArrayList<>(); + lines.add("{ \"event\": \"test event 1\" }"); + lines.add("{ \"event\": \"test event 2\" }"); + + boolean result = (Boolean) sendEventMethod.invoke(spool, lines, indexRecord, 3); + assertTrue(result); + + // Verify handler was called + verify(mockHandler).logJSON(lines); + } + + @Test + void testLogEventWithBufferedReader() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create a file with audit events + File testFile = new File(tempDir, "test_log_event.log"); + try (PrintWriter writer = new PrintWriter(testFile)) { + writer.println("{ \"event\": \"test event 1\" }"); + writer.println("{ \"event\": \"test event 2\" }"); + } + + // Create a BufferedReader for the file + BufferedReader reader = new BufferedReader(new FileReader(testFile)); + + // Prepare for the runLogAudit method which is what actually processes events + // The private method we need to test is actually part of the runLogAudit flow + + // Setup mock behavior + when(mockHandler.logJSON(anyCollection())).thenReturn(true); + + // Since we can't directly test logEvent (as it doesn't exist as a separate method), + // we'll test the main processing logic in runLogAudit with a custom setup + + // Set up the currentConsumerIndexRecord field + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(testFile.getAbsolutePath()); + record.setLinePosition(0); + record.setStatus(SPOOL_FILE_STATUS.pending); + + java.lang.reflect.Field currentConsumerIndexRecordField = + AuditFileSpool.class.getDeclaredField("currentConsumerIndexRecord"); + currentConsumerIndexRecordField.setAccessible(true); + currentConsumerIndexRecordField.set(spool, record); + + // Now we can use the sendEvent method which is used internally + java.lang.reflect.Method sendEventMethod = AuditFileSpool.class.getDeclaredMethod( + "sendEvent", List.class, AuditIndexRecord.class, int.class); + sendEventMethod.setAccessible(true); + + List lines = new ArrayList<>(); + String line; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + + boolean result = (Boolean) sendEventMethod.invoke(spool, lines, record, lines.size()); + assertTrue(result); + + // Verify handler was called + verify(mockHandler).logJSON(lines); + + reader.close(); + } + + @Test + void testArchiveCleanup() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create archive files with dates in the past + File archiveDir = spool.archiveFolder; + + // Create more than maxArchiveFiles (default 100) files + // We'll create 10 for the test to keep it quick + int maxArchiveFiles = 5; // Testing with a small number + + // Set maxArchiveFiles using reflection + java.lang.reflect.Field maxArchiveFilesField = + AuditFileSpool.class.getDeclaredField("maxArchiveFiles"); + maxArchiveFilesField.setAccessible(true); + maxArchiveFilesField.set(spool, maxArchiveFiles); + + // Create archive files + List createdFiles = new ArrayList<>(); + for (int i = 0; i < maxArchiveFiles + 5; i++) { + File archiveFile = new File(archiveDir, "test_archive_" + i + ".log"); + archiveFile.createNewFile(); + createdFiles.add(archiveFile); + } + + // Create test index done file entries for these files + try (PrintWriter writer = new PrintWriter(spool.indexDoneFile)) { + for (int i = 0; i < maxArchiveFiles + 5; i++) { + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(createdFiles.get(i).getAbsolutePath()); + record.setStatus(SPOOL_FILE_STATUS.done); + record.setWriteCompleteTime(new Date()); + writer.println(MiscUtil.stringify(record)); + } + } + + // Call appendToDoneFile which triggers archive cleanup + AuditIndexRecord testRecord = new AuditIndexRecord(); + testRecord.setId(UUID.randomUUID().toString()); + testRecord.setFilePath(new File(tempDir, "test_trigger.log").getAbsolutePath()); + testRecord.setStatus(SPOOL_FILE_STATUS.done); + testRecord.setWriteCompleteTime(new Date()); + + // Access the appendToDoneFile method + java.lang.reflect.Method appendToDoneFileMethod = + AuditFileSpool.class.getDeclaredMethod("appendToDoneFile", AuditIndexRecord.class); + appendToDoneFileMethod.setAccessible(true); + + // Call the method which should trigger archive cleanup + appendToDoneFileMethod.invoke(spool, testRecord); + + // Verify that files were cleaned up (should only be maxArchiveFiles files left) + File[] remainingFiles = archiveDir.listFiles(file -> file.getName().endsWith(".log")); + assertNotNull(remainingFiles); + assertTrue(remainingFiles.length <= maxArchiveFiles + 1); // +1 for the test_trigger.log + } + + @Test + void testCloseFileIfNeeded() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create a test file + spool.stashLogsString("test event"); + + // Get access to fields + java.lang.reflect.Field logWriterField = + AuditFileSpool.class.getDeclaredField("logWriter"); + logWriterField.setAccessible(true); + + java.lang.reflect.Field currentWriterIndexRecordField = + AuditFileSpool.class.getDeclaredField("currentWriterIndexRecord"); + currentWriterIndexRecordField.setAccessible(true); + + // Verify writer was created + assertNotNull(logWriterField.get(spool)); + assertNotNull(currentWriterIndexRecordField.get(spool)); + + // Get access to indexQueue + java.lang.reflect.Field indexQueueField = + AuditFileSpool.class.getDeclaredField("indexQueue"); + indexQueueField.setAccessible(true); + BlockingQueue indexQueue = + (BlockingQueue) indexQueueField.get(spool); + + // Remember queue size + int initialQueueSize = indexQueue.size(); + + // Access closeFileIfNeeded method + java.lang.reflect.Method closeFileIfNeededMethod = + AuditFileSpool.class.getDeclaredMethod("closeFileIfNeeded"); + closeFileIfNeededMethod.setAccessible(true); + + // Set isDrain to true to force close + java.lang.reflect.Field isDrainField = + AuditFileSpool.class.getDeclaredField("isDrain"); + isDrainField.setAccessible(true); + isDrainField.set(spool, true); + + // Call the method + closeFileIfNeededMethod.invoke(spool); + + // Verify file was closed + assertNull(logWriterField.get(spool)); + + // And that record was moved to the queue + assertTrue(indexQueue.size() > initialQueueSize); + } + + @Test + void testPrintIndex() throws Exception { + spool.init(props, "xasecure.audit.filespool"); + + // Create a test record + AuditIndexRecord record = new AuditIndexRecord(); + record.setId(UUID.randomUUID().toString()); + record.setFilePath(new File(tempDir, "test_log.log").getAbsolutePath()); + record.setStatus(SPOOL_FILE_STATUS.pending); + + // Access indexRecords field + java.lang.reflect.Field indexRecordsField = + AuditFileSpool.class.getDeclaredField("indexRecords"); + indexRecordsField.setAccessible(true); + List indexRecords = + (List) indexRecordsField.get(spool); + + // Add our test record + indexRecords.add(record); + + // Get access to printIndex method + java.lang.reflect.Method printIndexMethod = + AuditFileSpool.class.getDeclaredMethod("printIndex"); + printIndexMethod.setAccessible(true); + + // Call printIndex - just verify it doesn't throw an exception + assertDoesNotThrow(() -> printIndexMethod.invoke(spool)); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditQueueTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditQueueTest.java new file mode 100644 index 0000000000..986068e5c6 --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditQueueTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.destination.AuditDestination; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.apache.ranger.audit.provider.BaseAuditHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditQueue + * */ +class AuditQueueTest { + static class TestAuditQueue extends AuditQueue { + private String name = "testQueue"; + + public TestAuditQueue(AuditHandler consumer) { + super(consumer); + } + + @Override + public String getName() { + return name; } + + @Override + public void setName(String name) { + this.name = name; + super.setName(name); } + + public boolean log(Object event) { + return true; } + + public boolean log(List events) { + return true; } + + @Override + public boolean log(Collection events) { + return true; } + + @Override + public void start() {} + + @Override + public void stop() {} + } + + private AuditHandler mockConsumer; + private TestAuditQueue queue; + + @BeforeEach + void setUp() { + mockConsumer = mock(AuditHandler.class); + queue = new TestAuditQueue(mockConsumer); + } + + @Test + void testInitSetsProperties() { + Properties props = new Properties(); + props.setProperty("test.batch.size", "123"); + props.setProperty("test.queue.size", "456"); + props.setProperty("test.batch.interval.ms", "789"); + queue.init(props, "test"); + assertEquals(123, queue.getMaxBatchSize()); + assertEquals(456, queue.getMaxQueueSize()); + assertEquals(789, queue.getMaxBatchInterval()); + } + + @Test + void testSetAndGetMaxBatchSize() { + queue.setMaxBatchSize(42); + assertEquals(42, queue.getMaxBatchSize()); + } + + @Test + void testSetAndGetMaxQueueSize() { + queue.setMaxQueueSize(99); + assertEquals(99, queue.getMaxQueueSize()); + } + + @Test + void testSetAndGetMaxBatchInterval() { + queue.setMaxBatchInterval(77); + assertEquals(77, queue.getMaxBatchInterval()); + } + + @Test + void testFlushDelegatesToConsumer() { + queue.flush(); + verify(mockConsumer).flush(); + } + + @Test + void testWaitToCompleteDelegatesToConsumer() { + queue.waitToComplete(); + verify(mockConsumer).waitToComplete(-1); + queue.waitToComplete(100L); + verify(mockConsumer).waitToComplete(100L); + } + + @Test + void testSetDrainAndIsDrain() { + queue.setDrain(true); + assertTrue(queue.isDrain()); + queue.setDrain(false); + assertFalse(queue.isDrain()); + } + + @Test + void testDoubleInitDoesNotThrow() { + Properties props = new Properties(); + queue.init(props, "test"); + assertDoesNotThrow(() -> queue.init(props, "test")); + } + + @Test + void testSetNameUpdatesConsumerParentPath() { + BaseAuditHandler baseHandler = mock(BaseAuditHandler.class); + TestAuditQueue q = new TestAuditQueue(baseHandler); + q.setName("newName"); + verify(baseHandler, atLeastOnce()).setParentPath("newName"); + } + + @Test + void testSetParentPathUpdatesConsumerParentPath() { + BaseAuditHandler baseHandler = mock(BaseAuditHandler.class); + TestAuditQueue q = new TestAuditQueue(baseHandler); + q.setParentPath("parentPath"); + verify(baseHandler, atLeastOnce()).setParentPath("testQueue"); + } + + @Test + void testGetFinalPathWithBaseAuditHandler() { + BaseAuditHandler baseHandler = mock(BaseAuditHandler.class); + when(baseHandler.getFinalPath()).thenReturn("finalPath"); + TestAuditQueue q = new TestAuditQueue(baseHandler); + assertEquals("finalPath", q.getFinalPath()); + } + + @Test + void testGetFinalPathWithNullConsumer() { + TestAuditQueue q = new TestAuditQueue(null); + assertEquals("testQueue", q.getFinalPath()); + } + + @Test + void testFileSpoolerInitFailure() { + Properties props = new Properties(); + props.setProperty("test.filespool.enable", "true"); + props.setProperty("test.filespool.local.dir", System.getProperty("java.io.tmpdir") + "/audit_spool_test"); + + // Mock the AuditFileSpool to simulate initialization failure + AuditFileSpool mockSpool = mock(AuditFileSpool.class); + when(mockSpool.init(any(), any())).thenReturn(false); + + // Use reflection to replace fileSpooler + try { + Field fileSpoolerField = AuditQueue.class.getDeclaredField("fileSpooler"); + fileSpoolerField.setAccessible(true); + fileSpoolerField.set(queue, mockSpool); + } catch (Exception e) { + fail("Failed to set up test: " + e.getMessage()); + } + + queue.init(props, "test"); + + assertFalse(queue.fileSpoolerEnabled); + } + + @Test + void testGetFinalPath() { + // Case 1: consumer is a BaseAuditHandler + BaseAuditHandler baseConsumer = mock(BaseAuditHandler.class); + when(baseConsumer.getFinalPath()).thenReturn("finalPath"); + + TestAuditQueue queueWithBaseHandler = new TestAuditQueue(baseConsumer); + queueWithBaseHandler.setName("testQueue"); + + assertEquals("finalPath", queueWithBaseHandler.getFinalPath()); + + // Case 2: consumer is a regular AuditHandler + AuditHandler regularConsumer = mock(AuditHandler.class); + when(regularConsumer.getName()).thenReturn("consumerName"); + + TestAuditQueue queueWithRegularHandler = new TestAuditQueue(regularConsumer); + queueWithRegularHandler.setName("testQueue"); + + assertEquals("consumerName", queueWithRegularHandler.getFinalPath()); + + // Case 3: no consumer + TestAuditQueue queueNoConsumer = new TestAuditQueue(null); + queueNoConsumer.setName("testQueueNoConsumer"); + + assertEquals("testQueueNoConsumer", queueNoConsumer.getFinalPath()); + } + + @Test + void testWaitToComplete() { + AuditHandler mockHandler = mock(AuditHandler.class); + TestAuditQueue queueWithHandler = new TestAuditQueue(mockHandler); + + // Test waitToComplete without timeout + queueWithHandler.waitToComplete(); + verify(mockHandler).waitToComplete(-1); + + // Test waitToComplete with timeout + queueWithHandler.waitToComplete(5000); + verify(mockHandler).waitToComplete(5000); + + // Test with null consumer + TestAuditQueue queueWithNull = new TestAuditQueue(null); + queueWithNull.waitToComplete(); // Should not throw exception + queueWithNull.waitToComplete(5000); // Should not throw exception + } + + @Test + void testFlush() { + AuditHandler mockHandler = mock(AuditHandler.class); + TestAuditQueue queueWithHandler = new TestAuditQueue(mockHandler); + + queueWithHandler.flush(); + verify(mockHandler).flush(); + + // Test with null consumer + TestAuditQueue queueWithNull = new TestAuditQueue(null); + queueWithNull.flush(); // Should not throw exception + } + + @Test + void testConsumerDestinationDetection() { + // Test with a consumer that is an AuditDestination + AuditDestination mockDestination = mock(AuditDestination.class); + TestAuditQueue queueWithDestination = new TestAuditQueue(mockDestination); + + assertTrue(queueWithDestination.isConsumerDestination); + + // Test with a regular consumer + AuditHandler mockHandler = mock(AuditHandler.class); + TestAuditQueue queueWithHandler = new TestAuditQueue(mockHandler); + + assertFalse(queueWithHandler.isConsumerDestination); + } +} diff --git a/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditSummaryQueueTest.java b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditSummaryQueueTest.java new file mode 100644 index 0000000000..4a0ed13e9d --- /dev/null +++ b/agents-audit/core/src/test/java/org/apache/ranger/audit/queue/AuditSummaryQueueTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @generated by copilot + * @description Unit Test cases for AuditSummaryQueue + * */ +class AuditSummaryQueueTest { + private AuditHandler mockHandler; + private AuditSummaryQueue queue; + private Properties props; + + @BeforeEach + void setUp() { + mockHandler = mock(AuditHandler.class); + when(mockHandler.getName()).thenReturn("mockHandler"); + queue = new AuditSummaryQueue(mockHandler); + + props = new Properties(); + props.setProperty("test.summary.interval.ms", "1234"); + } + + @Test + void testInitSetsSummaryInterval() { + queue.init(props, "test"); + // Reflection to access private field + try { + Field field = AuditSummaryQueue.class.getDeclaredField("maxSummaryIntervalMs"); + field.setAccessible(true); + int interval = (int) field.get(queue); + assertEquals(1234, interval); + } catch (Exception e) { + fail("Reflection failed: " + e.getMessage()); + } + } + + @Test + void testLogSingleEvent() { + AuditEventBase event = mock(AuditEventBase.class); + assertTrue(queue.log(event)); + } + + @Test + void testLogMultipleEvents() { + AuditEventBase event1 = mock(AuditEventBase.class); + AuditEventBase event2 = mock(AuditEventBase.class); + List events = Arrays.asList(event1, event2); + assertTrue(queue.log(events)); + } + + @Test + void testStartAndStopThread() { + queue.start(); + assertNotNull(queue.consumerThread); + assertTrue(queue.consumerThread.isAlive()); + queue.stop(); + assertNull(queue.consumerThread); + } + + @Test + void testLogReturnsFalseWhenQueueFull() throws Exception { + queue.setMaxQueueSize(0); // Set max size to 0 to simulate full queue + AuditEventBase event = mock(AuditEventBase.class); + assertFalse(queue.log(event)); + } + + @Test + void testDoubleInitDoesNotThrow() { + queue.init(props, "test"); + assertDoesNotThrow(() -> queue.init(props, "test")); + } + + @Test + void testLogAfterStopDoesNotThrow() { + queue.start(); + queue.stop(); + AuditEventBase event = mock(AuditEventBase.class); + assertDoesNotThrow(() -> queue.log(event)); + } + + @Test + void testStartIsIdempotent() { + queue.start(); + Thread firstThread = queue.consumerThread; + assertNotNull(firstThread); + assertTrue(firstThread.isAlive()); + + // Second start should not throw and should create a new thread + assertDoesNotThrow(() -> queue.start()); + Thread secondThread = queue.consumerThread; + assertNotNull(secondThread); + assertTrue(secondThread.isAlive()); + } + + @Test + void testStopIsIdempotent() { + queue.start(); + queue.stop(); + assertNull(queue.consumerThread); + // Second stop should not throw + assertDoesNotThrow(() -> queue.stop()); + } + + @Test + void testLogEmptyCollectionReturnsTrue() { + assertTrue(queue.log(Collections.emptyList())); + } + + @Test + void testRunLogAuditDispatchesSummary() { + AuditEventBase event = mock(AuditEventBase.class); + when(event.getEventKey()).thenReturn("key1"); + when(event.getEventTime()).thenReturn(new Date()); + queue.log(event); + + // Set drain to true to force dispatch + queue.setDrain(true); + + // Specify the type to resolve ambiguity + when(mockHandler.log(any(AuditEventBase.class))).thenReturn(true); + queue.runLogAudit(); + + // After dispatch, summaryMap should be empty + try { + Field field = AuditSummaryQueue.class.getDeclaredField("summaryMap"); + field.setAccessible(true); + Map map = (Map) field.get(queue); + assertTrue(map.isEmpty()); + } catch (Exception e) { + fail("Reflection failed: " + e.getMessage()); + } + verify(mockHandler, atLeastOnce()).log(any(AuditEventBase.class)); + } + + @Test + void testRunLogAuditWithInterruptedException() throws Exception { + // Setup mock consumer + when(mockHandler.log(any(AuditEventBase.class))).thenReturn(true); + + // Start the queue + queue.start(); + + // Add an event to the queue + AuditEventBase event = mock(AuditEventBase.class); + when(event.getEventKey()).thenReturn("test-key"); + when(event.getEventTime()).thenReturn(new Date()); + queue.log(event); + + // Interrupt the thread + queue.consumerThread.interrupt(); + + // Wait for a short time + Thread.sleep(200); + + // Thread should still be alive after interrupt since it catches InterruptedException + assertTrue(queue.consumerThread.isAlive()); + + // Cleanup + queue.stop(); + } + + @Test + void testMultipleEventsWithSameKey() throws Exception { + // Create events with the same key but different times + AuditEventBase event1 = mock(AuditEventBase.class); + when(event1.getEventKey()).thenReturn("same-key"); + Date startTime = new Date(System.currentTimeMillis() - 10000); // 10 seconds ago + when(event1.getEventTime()).thenReturn(startTime); + + AuditEventBase event2 = mock(AuditEventBase.class); + when(event2.getEventKey()).thenReturn("same-key"); + Date endTime = new Date(); // Now + when(event2.getEventTime()).thenReturn(endTime); + + // Add events to queue + queue.log(event1); + queue.log(event2); + + // Set drain to true to force dispatch + queue.setDrain(true); + + // Mock handler to return success + when(mockHandler.log(any(AuditEventBase.class))).thenReturn(true); + + // Run the audit processing + queue.runLogAudit(); + + // Verify the event was updated with combined info + verify(event1).setEventCount(2); // Should count both events + + // Verify duration is approximately 10 seconds (allow some buffer) + ArgumentCaptor durationCaptor = ArgumentCaptor.forClass(Long.class); + verify(event1).setEventDurationMS(durationCaptor.capture()); + long capturedDuration = durationCaptor.getValue(); + assertTrue(capturedDuration >= 9000 && capturedDuration <= 11000, + "Duration should be around 10000ms but was " + capturedDuration); + + // Verify the event was logged + verify(mockHandler).log(event1); + } + + @Test + void testTimeoutTriggersDispatch() throws Exception { + // Set a very short summary interval + Field maxSummaryIntervalMsField = AuditSummaryQueue.class.getDeclaredField("maxSummaryIntervalMs"); + maxSummaryIntervalMsField.setAccessible(true); + maxSummaryIntervalMsField.set(queue, 100); // 100ms interval + + // Create a test event + AuditEventBase event = mock(AuditEventBase.class); + when(event.getEventKey()).thenReturn("test-key"); + when(event.getEventTime()).thenReturn(new Date()); + + // Add event to queue + queue.log(event); + + // Mock handler to return success + when(mockHandler.log(any(AuditEventBase.class))).thenReturn(true); + + // Start processing + queue.start(); + + // Wait for interval to elapse and processing to occur + Thread.sleep(200); + + // Verify the event was logged + verify(mockHandler, timeout(500).atLeastOnce()).log(any(AuditEventBase.class)); + + // Cleanup + queue.stop(); + } + + @Test + void testQueueDrainToWithMultipleEvents() { + // Create more events than initial fetch + List events = new ArrayList<>(); + for (int i = 0; i < 500; i++) { + AuditEventBase event = mock(AuditEventBase.class); + when(event.getEventKey()).thenReturn("key-" + i); + when(event.getEventTime()).thenReturn(new Date()); + events.add(event); + } + + // Add all events to queue + assertTrue(queue.log(events)); + + // Set drain to true + queue.setDrain(true); + + // Mock handler to return success + when(mockHandler.log(any(AuditEventBase.class))).thenReturn(true); + + // Run the audit processing + queue.runLogAudit(); + + // Verify all events were processed + verify(mockHandler, times(500)).log(any(AuditEventBase.class)); + } + + @Test + void testEmptySummaryMapStillExits() { + // Set drain to true with empty queue and summary map + queue.setDrain(true); + + // Run the audit processing + queue.runLogAudit(); + + // Verify consumer stop was called + verify(mockHandler).stop(); + } + + @Test + void testConsumerStopException() { + // Set up mock to throw exception on stop + doThrow(new RuntimeException("Test exception")).when(mockHandler).stop(); + + // Set drain to true + queue.setDrain(true); + + // Run the audit processing - should catch exception + assertDoesNotThrow(() -> queue.runLogAudit()); + + // Verify consumer stop was called + verify(mockHandler).stop(); + } +}