From bce91b966fbe74d48d9752d8805750583388d72b Mon Sep 17 00:00:00 2001 From: Mark Kasaboski Date: Mon, 18 Nov 2024 17:32:02 -0500 Subject: [PATCH] Refactors ingestion script adding Splunk interfaces and refactors unit test --- packages/flare/bin/cron_job_ingest_events.py | 386 ++++++++------ .../flare/tests/bin/test_ingest_events.py | 473 ++++++++---------- 2 files changed, 425 insertions(+), 434 deletions(-) diff --git a/packages/flare/bin/cron_job_ingest_events.py b/packages/flare/bin/cron_job_ingest_events.py index 700e5db..4a985e3 100644 --- a/packages/flare/bin/cron_job_ingest_events.py +++ b/packages/flare/bin/cron_job_ingest_events.py @@ -7,6 +7,7 @@ from typing import Any from typing import Iterator from typing import Optional +from typing import Protocol sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor")) @@ -21,222 +22,280 @@ from constants import PasswordKeys from flare import FlareAPI from logger import Logger -from vendor.splunklib.client import Service +from vendor.splunklib.client import Entity -def main(logger: Logger, app: client.Application) -> None: - create_collection(app=app) +class StoragePasswords(Protocol): + def list(self) -> list: + pass - # To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work - last_fetched_timestamp = get_last_fetched(app) - if last_fetched_timestamp and last_fetched_timestamp > ( - datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH - ): - logger.info( - f"Fetched events less than {int(CRON_JOB_THRESHOLD_SINCE_LAST_FETCH.seconds / 60)} minutes ago, exiting" - ) - return - api_key = get_api_key(app=app) - tenant_id = get_tenant_id(app=app) - ingest_metadata_only = get_ingest_metadata_only(app=app) +class KVStoreCollections(Protocol): + def __getitem__(self, key: str) -> Entity: + pass - save_last_fetched(app=app) - events_fetchd_count = 0 - for event, next_token in fetch_feed( - logger=logger, - app=app, - api_key=api_key, - tenant_id=tenant_id, - ingest_metadata_only=ingest_metadata_only, - ): - save_last_fetched(app=app) + def __contains__(self, item: str) -> bool: + pass - save_start_date(app=app, tenant_id=tenant_id) - save_next(app=app, tenant_id=tenant_id, next=next_token) + def create(self, name: str, fields: dict) -> dict: + pass - print(json.dumps(event), flush=True) - events_fetchd_count += 1 +class KVStoreCollectionData(Protocol): + def insert(self, data: str) -> dict: + pass - logger.info(f"Fetchd {events_fetchd_count} events") + def update(self, id: str, data: str) -> dict: + pass + def query(self, **query: dict) -> list: + pass -def get_storage_password_value( - app: client.Application, password_key: str -) -> Optional[str]: - for item in app.service.storage_passwords.list(): - if item.content.username == password_key: - return item.clear_password - return None +class Collection(Protocol): + def __getitem__(self, key: str) -> Entity: + pass -def get_api_key(app: client.Application) -> str: - api_key = get_storage_password_value( - app=app, password_key=PasswordKeys.API_KEY.value - ) - if not api_key: - raise Exception("API key not found") - return api_key +class Service(Protocol): + @property + def apps(self) -> Collection: + pass + @property + def storage_passwords(self) -> StoragePasswords: + pass -def get_tenant_id(app: client.Application) -> int: - stored_tenant_id = get_storage_password_value( - app=app, password_key=PasswordKeys.TENANT_ID.value - ) - try: - tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None - except Exception: + @property + def kvstore(self) -> KVStoreCollections: pass - if not tenant_id: - raise Exception("Tenant ID not found") - return tenant_id +class Application(Protocol): + service: Service -def get_ingest_metadata_only(app: client.Application) -> bool: - return ( - get_storage_password_value( - app=app, password_key=PasswordKeys.INGEST_METADATA_ONLY.value - ) - == "true" - ) +class FlareEventIngestor: + def __init__( + self, + *, + kvstore: KVStoreCollections, + storage_passwords: StoragePasswords, + flare_api_cls: FlareAPI = FlareAPI, + logger: Logger, + ): + self.kvstore = kvstore + self.storage_passwords = storage_passwords + self.flare_api_cls = flare_api_cls + self.logger = logger + + def run(self) -> None: + self.create_collection() + + # To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work + last_fetched_timestamp = self.get_last_fetched() + if last_fetched_timestamp and last_fetched_timestamp > ( + datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH + ): + self.logger.info( + f"Fetched events less than {int(CRON_JOB_THRESHOLD_SINCE_LAST_FETCH.seconds / 60)} minutes ago, exiting" + ) + return + + api_key = self.get_api_key() + tenant_id = self.get_tenant_id() + ingest_metadata_only = self.get_ingest_metadata_only() + + self.save_last_fetched() + events_fetched_count = 0 + for event, next_token in self.fetch_feed( + api_key=api_key, + tenant_id=tenant_id, + ingest_metadata_only=ingest_metadata_only, + ): + self.save_last_fetched() -def get_next(app: client.Application, tenant_id: int) -> Optional[str]: - return get_collection_value( - app=app, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" - ) + self.save_start_date(tenant_id=tenant_id) + self.save_next(tenant_id=tenant_id, next=next_token) + print(json.dumps(event), flush=True) -def get_start_date(app: client.Application) -> Optional[date]: - start_date = get_collection_value(app=app, key=CollectionKeys.START_DATE.value) - if start_date: - try: - return date.fromisoformat(start_date) - except Exception: - pass - return None + events_fetched_count += 1 + self.logger.info(f"Fetched {events_fetched_count} events") -def get_current_tenant_id(app: client.Application) -> Optional[int]: - current_tenant_id = get_collection_value( - app=app, key=CollectionKeys.CURRENT_TENANT_ID.value - ) - try: - return int(current_tenant_id) if current_tenant_id else None - except Exception: - pass - return None + def get_storage_password_value(self, *, password_key: str) -> Optional[str]: + for item in self.storage_passwords.list(): + if item.content.username == password_key: + return item.clear_password + return None -def get_last_fetched(app: client.Application) -> Optional[datetime]: - timestamp_last_fetched = get_collection_value( - app=app, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value - ) - if timestamp_last_fetched: + def get_api_key(self) -> str: + api_key = self.get_storage_password_value( + password_key=PasswordKeys.API_KEY.value + ) + if not api_key: + raise Exception("API key not found") + return api_key + + def get_tenant_id(self) -> int: + stored_tenant_id = self.get_storage_password_value( + password_key=PasswordKeys.TENANT_ID.value + ) try: - return datetime.fromisoformat(timestamp_last_fetched) + tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None except Exception: pass - return None + if not tenant_id: + raise Exception("Tenant ID not found") + return tenant_id -def create_collection(app: client.Application) -> None: - if KV_COLLECTION_NAME not in app.service.kvstore: - # Create the collection - app.service.kvstore.create( - name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"} + def get_ingest_metadata_only(self) -> bool: + return ( + self.get_storage_password_value( + password_key=PasswordKeys.INGEST_METADATA_ONLY.value + ) + == "true" ) + def get_next(self, *, tenant_id: int) -> Optional[str]: + return self.get_collection_value( + key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" + ) -def save_start_date(app: client.Application, tenant_id: int) -> None: - current_tenant_id = get_current_tenant_id(app=app) - # If this is the first request ever, insert today's date so that future requests will be based on that - if not get_start_date(app): - save_collection_value( - app=app, - key=CollectionKeys.START_DATE.value, - value=date.today().isoformat(), + def get_start_date(self) -> Optional[date]: + start_date = self.get_collection_value(key=CollectionKeys.START_DATE.value) + if start_date: + try: + return date.fromisoformat(start_date) + except Exception: + pass + return None + + def get_current_tenant_id(self) -> Optional[int]: + current_tenant_id = self.get_collection_value( + key=CollectionKeys.CURRENT_TENANT_ID.value ) + try: + return int(current_tenant_id) if current_tenant_id else None + except Exception: + pass + return None - # If the current tenant has changed, update the start date so that future requests will be based off today - # If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day - # that tenant was switched in the first place. - if current_tenant_id != tenant_id: - app.service.kvstore[KV_COLLECTION_NAME].data.update( - id=CollectionKeys.START_DATE.value, - data=json.dumps({"value": date.today().isoformat()}), + def get_last_fetched(self) -> Optional[datetime]: + timestamp_last_fetched = self.get_collection_value( + key=CollectionKeys.TIMESTAMP_LAST_FETCH.value ) + if timestamp_last_fetched: + try: + return datetime.fromisoformat(timestamp_last_fetched) + except Exception: + pass + return None + + def create_collection(self) -> None: + if KV_COLLECTION_NAME not in self.kvstore: + # Create the collection + self.kvstore.create( + name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"} + ) + def save_start_date(self, *, tenant_id: int) -> None: + current_tenant_id = self.get_current_tenant_id() + # If this is the first request ever, insert today's date so that future requests will be based on that + if not self.get_start_date(): + self.save_collection_value( + key=CollectionKeys.START_DATE.value, + value=date.today().isoformat(), + ) -def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None: - # If we have a new next value, update the collection for that tenant to continue searching from that point - if not next: - return + # If the current tenant has changed, update the start date so that future requests will be based off today + # If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day + # that tenant was switched in the first place. + if current_tenant_id != tenant_id: + kvstore_collections_data: KVStoreCollectionData = self.kvstore[ + KV_COLLECTION_NAME + ].data + kvstore_collections_data.update( + id=CollectionKeys.START_DATE.value, + data=json.dumps({"value": date.today().isoformat()}), + ) - save_collection_value( - app=app, - key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}", - value=next, - ) + def save_next(self, *, tenant_id: int, next: Optional[str]) -> None: + # If we have a new next value, update the collection for that tenant to continue searching from that point + if not next: + return + self.save_collection_value( + key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}", + value=next, + ) -def save_last_fetched(app: client.Application) -> None: - save_collection_value( - app=app, - key=CollectionKeys.TIMESTAMP_LAST_FETCH.value, - value=datetime.now().isoformat(), - ) + def save_last_fetched(self) -> None: + self.save_collection_value( + key=CollectionKeys.TIMESTAMP_LAST_FETCH.value, + value=datetime.now().isoformat(), + ) + def get_collection_value(self, *, key: str) -> Optional[str]: + # Ensure collection exists + self.create_collection() -def get_collection_value(app: client.Application, key: str) -> Optional[str]: - if KV_COLLECTION_NAME in app.service.kvstore: - data = app.service.kvstore[KV_COLLECTION_NAME].data.query() + kvstore_collections_data: KVStoreCollectionData = self.kvstore[ + KV_COLLECTION_NAME + ].data + data = kvstore_collections_data.query() for entry in data: if entry["_key"] == key: return entry["value"] - return None - - -def save_collection_value(app: client.Application, key: str, value: Any) -> None: - if not get_collection_value(app=app, key=key): - app.service.kvstore[KV_COLLECTION_NAME].data.insert( - json.dumps( - { - "_key": key, - "value": value, - } + return None + + def save_collection_value(self, *, key: str, value: Any) -> None: + kvstore_collections_data: KVStoreCollectionData = self.kvstore[ + KV_COLLECTION_NAME + ].data + if not self.get_collection_value(key=key): + kvstore_collections_data.insert( + json.dumps( + { + "_key": key, + "value": value, + } + ) ) - ) - else: - app.service.kvstore[KV_COLLECTION_NAME].data.update( + return + + kvstore_collections_data.update( id=key, data=json.dumps({"value": value}), ) + def fetch_feed( + self, + *, + api_key: str, + tenant_id: int, + ingest_metadata_only: bool, + ) -> Iterator[tuple[dict, str]]: + try: + flare_api: FlareAPI = self.flare_api_cls( + api_key=api_key, tenant_id=tenant_id + ) -def fetch_feed( - logger: Logger, - app: client.Application, - api_key: str, - tenant_id: int, - ingest_metadata_only: bool, -) -> Iterator[tuple[dict, str]]: - try: - flare_api = FlareAPI(api_key=api_key, tenant_id=tenant_id) - - next = get_next(app=app, tenant_id=tenant_id) - start_date = get_start_date(app=app) - logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}") - for event_next in flare_api.fetch_feed_events( - next=next, start_date=start_date, ingest_metadata_only=ingest_metadata_only - ): - yield event_next - except Exception as e: - logger.error(f"Exception={e}") + next = self.get_next(tenant_id=tenant_id) + start_date = self.get_start_date() + self.logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}") + for event_next in flare_api.fetch_feed_events( + next=next, + start_date=start_date, + ingest_metadata_only=ingest_metadata_only, + ): + yield event_next + except Exception as e: + self.logger.error(f"Exception={e}") def get_splunk_service(logger: Logger) -> Service: @@ -257,8 +316,11 @@ def get_splunk_service(logger: Logger) -> Service: if __name__ == "__main__": logger = Logger(class_name=__file__) splunk_service = get_splunk_service(logger=logger) + app: Application = splunk_service.apps[APP_NAME] - main( + ingestor = FlareEventIngestor( + kvstore=app.service.kvstore, + storage_passwords=app.service.storage_passwords, logger=logger, - app=splunk_service.apps[APP_NAME], ) + ingestor.run() diff --git a/packages/flare/tests/bin/test_ingest_events.py b/packages/flare/tests/bin/test_ingest_events.py index 1b7b26b..c62d28d 100644 --- a/packages/flare/tests/bin/test_ingest_events.py +++ b/packages/flare/tests/bin/test_ingest_events.py @@ -2,287 +2,216 @@ import os import pytest import sys +import unittest -from datetime import date from datetime import datetime from datetime import timedelta from typing import Any -from unittest.mock import MagicMock -from unittest.mock import Mock -from unittest.mock import PropertyMock -from unittest.mock import patch +from typing import Dict +from typing import List +from typing import Optional +from typing import TypeVar sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../bin")) from constants import CRON_JOB_THRESHOLD_SINCE_LAST_FETCH from constants import KV_COLLECTION_NAME from constants import CollectionKeys -from cron_job_ingest_events import fetch_feed -from cron_job_ingest_events import get_api_key -from cron_job_ingest_events import get_collection_value -from cron_job_ingest_events import get_current_tenant_id -from cron_job_ingest_events import get_last_fetched -from cron_job_ingest_events import get_start_date -from cron_job_ingest_events import get_tenant_id -from cron_job_ingest_events import main -from cron_job_ingest_events import save_collection_value -from cron_job_ingest_events import save_start_date +from cron_job_ingest_events import FlareEventIngestor -def test_get_collection_value_expect_none() -> None: - app = MagicMock() - assert get_collection_value(app=app, key="some_key") is None - - -def test_get_collection_value_expect_result() -> None: - app = MagicMock() - app.service.kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME - app.service.kvstore[KV_COLLECTION_NAME].data.query.return_value = [ - { - "_key": "some_key", - "value": "some_value", - }, - ] - - assert get_collection_value(app=app, key="some_key") == "some_value" - - -def test_save_collection_value_expect_insert() -> None: - key = "some_key" - value = "some_value" - app = MagicMock() - save_collection_value(app=app, key=key, value=value) - app.service.kvstore[KV_COLLECTION_NAME].data.insert.assert_called_once_with( - json.dumps({"_key": key, "value": value}) - ) - - -def test_save_collection_value_expect_update() -> None: - key = "some_key" - value = "update_value" - app = MagicMock() - app.service.kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME - app.service.kvstore[KV_COLLECTION_NAME].data.query.return_value = [ - { - "_key": key, - "value": "old_value", - }, - ] - save_collection_value(app=app, key=key, value=value) - app.service.kvstore[KV_COLLECTION_NAME].data.update.assert_called_once_with( - id=key, - data=json.dumps({"value": value}), - ) - - -def test_get_api_key_tenant_id_expect_exception() -> None: - app = MagicMock() - - with pytest.raises(Exception, match="API key not found"): - get_api_key(app=app) - - with pytest.raises(Exception, match="Tenant ID not found"): - get_tenant_id(app=app) - - -def test_get_api_credentials_expect_api_key_and_tenant_id() -> None: - app = MagicMock() - - api_key_item = Mock() - type(api_key_item.content).username = PropertyMock(return_value="api_key") - type(api_key_item).clear_password = PropertyMock(return_value="some_api_key") - - tenant_id_item = Mock() - type(tenant_id_item.content).username = PropertyMock(return_value="tenant_id") - type(tenant_id_item).clear_password = PropertyMock(return_value=11111) - - app.service.storage_passwords.list.return_value = [api_key_item, tenant_id_item] - - api_key = get_api_key(app=app) - assert api_key == "some_api_key" - tenant_id = get_tenant_id(app=app) - assert tenant_id == 11111 - - -@patch( - "cron_job_ingest_events.get_collection_value", return_value="not_an_isoformat_date" -) -def test_get_start_date_expect_none(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert get_start_date(app=app) is None - - -@patch( - "cron_job_ingest_events.get_collection_value", return_value=date.today().isoformat() -) -def test_get_start_date_expect_date(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert isinstance(get_start_date(app=app), date) - - -@patch("cron_job_ingest_events.get_collection_value", return_value="not_a_number") -def test_get_current_tenant_id_expect_none( - get_collection_value_mock: MagicMock, -) -> None: - app = MagicMock() - assert get_current_tenant_id(app=app) is None - - -@patch("cron_job_ingest_events.get_collection_value", return_value="11111") -def test_get_current_tenant_id_expect_integer( - get_collection_value_mock: MagicMock, -) -> None: - app = MagicMock() - assert get_current_tenant_id(app=app) == 11111 - - -@patch( - "cron_job_ingest_events.get_collection_value", return_value="not_an_isoformat_date" -) -def test_get_last_fetched_expect_none(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert get_last_fetched(app=app) is None - - -@patch( - "cron_job_ingest_events.get_collection_value", - return_value=datetime.now().isoformat(), -) -def test_get_last_fetched_expect_datetime(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert isinstance(get_last_fetched(app=app), datetime) - - -@patch("cron_job_ingest_events.save_collection_value") -@patch("cron_job_ingest_events.get_start_date", return_value=None) -@patch("cron_job_ingest_events.get_current_tenant_id", return_value=11111) -def test_save_start_date_expect_save_collection_value_called_and_tenant_id_unchanged( - get_current_tenant_id_mock: MagicMock, - get_start_date_mock: MagicMock, - save_collection_value_mock: MagicMock, -) -> None: - app = MagicMock() - save_start_date(app=app, tenant_id=11111) - save_collection_value_mock.assert_called_once_with( - app=app, key=CollectionKeys.START_DATE.value, value=date.today().isoformat() - ) - app.service.kvstore[KV_COLLECTION_NAME].data.update.assert_not_called() - - -@patch("cron_job_ingest_events.save_collection_value") -@patch("cron_job_ingest_events.get_start_date", return_value=date.today()) -@patch("cron_job_ingest_events.get_current_tenant_id", return_value=22222) -def test_save_start_date_expect_save_collection_value_not_called_and_tenant_id_changed( - get_current_tenant_id_mock: MagicMock, - get_start_date_mock: MagicMock, - save_collection_value_mock: MagicMock, -) -> None: - app = MagicMock() - save_start_date(app=app, tenant_id=11111) - save_collection_value_mock.assert_not_called() - app.service.kvstore[KV_COLLECTION_NAME].data.update.assert_called_once_with( - id=CollectionKeys.START_DATE.value, - data=json.dumps({"value": date.today().isoformat()}), - ) - - -def test_fetch_feed_expect_exception() -> None: - logger = MagicMock() - app = MagicMock() - for _ in fetch_feed( - logger=logger, - app=app, - api_key="some_key", - tenant_id=11111, - ingest_metadata_only=False, - ): - pass - - logger.error.assert_called_once_with("Exception=Failed to fetch API Token") - - -@patch("cron_job_ingest_events.FlareAPI") -@patch("time.sleep", return_value=None) -def test_fetch_feed_expect_feed_response( - sleep: Any, flare_api_mock: MagicMock, capfd: Any -) -> None: - logger = MagicMock() - app = MagicMock() - - next = "some_next_value" - first_item = { - "actor": "this guy", - } - second_item = { - "actor": "some other guy", - } - expected_items = [first_item, second_item] - flare_api_mock_instance = flare_api_mock.return_value - flare_api_mock_instance.fetch_feed_events.return_value = iter( - [(first_item, next), (second_item, next)] - ) - - events: list[dict] = [] - for event, next_token in fetch_feed( - logger=logger, - app=app, - api_key="some_key", - tenant_id=11111, - ingest_metadata_only=False, - ): - assert next_token == next - events.append(event) - - for i in range(len(events)): - assert events[i] == expected_items[i] - - -@patch( - "cron_job_ingest_events.get_last_fetched", - return_value=datetime.now() - timedelta(minutes=5), -) -def test_main_expect_early_return(get_last_fetched_mock: MagicMock) -> None: - logger = MagicMock() - app = MagicMock() - - main(logger=logger, app=app) - logger.info.assert_called_once_with( - f"Fetched events less than {int(CRON_JOB_THRESHOLD_SINCE_LAST_FETCH.seconds / 60)} minutes ago, exiting" - ) - - -@patch("cron_job_ingest_events.fetch_feed") -@patch( - "cron_job_ingest_events.get_ingest_metadata_only", - return_value=(False), -) -@patch( - "cron_job_ingest_events.get_tenant_id", - return_value=(111), -) -@patch( - "cron_job_ingest_events.get_api_key", - return_value=("some_api_key"), -) -@patch( - "cron_job_ingest_events.get_last_fetched", - return_value=datetime.now() - timedelta(minutes=10), -) -def test_main_expect_normal_run( - get_last_fetched_mock: MagicMock, - get_api_key_mock: MagicMock, - get_tenant_id_mock: MagicMock, - get_ingest_metadata_only_mock: MagicMock, - fetch_feed_mock: MagicMock, -) -> None: - logger = MagicMock() - app = MagicMock() - - main(logger=logger, app=app) - fetch_feed_mock.assert_called_once_with( - logger=logger, - app=app, - api_key="some_api_key", - tenant_id=111, - ingest_metadata_only=False, - ) +T = TypeVar("T", bound="StoragePassword") + + +class StoragePassword: + def __init__(self, username: str, clear_password: str) -> None: + self._state = { + "username": username, + "clear_password": clear_password, + } + + @property + def content(self: T) -> T: + return self + + @property + def username(self) -> str: + return self._state["username"] + + @property + def clear_password(self) -> str: + return self._state["clear_password"] + + +class FakeStoragePasswords: + def __init__(self, passwords: List[StoragePassword]) -> None: + self._passwords = passwords + + def list(self) -> List[StoragePassword]: + return self._passwords + + +class FakeKVStoreCollectionData: + def __init__(self) -> None: + self._data: dict[str, str] = {} + + def insert(self, data: str) -> dict[str, str]: + entry = json.loads(data) + self._data[entry["_key"]] = entry["value"] + return entry + + def update(self, id: str, data: str) -> dict[str, str]: + entry = json.loads(data) + self._data[id] = entry["value"] + return entry + + def query(self, **query: dict) -> List[Dict[str, str]]: + return [{"_key": key, "value": value} for key, value in self._data.items()] + + +class FakeKVStoreCollection: + def __init__(self) -> None: + self._data = FakeKVStoreCollectionData() + + @property + def data(self) -> FakeKVStoreCollectionData: + return self._data + + +class FakeKVStoreCollections: + def __init__(self) -> None: + self._collections: dict[str, Any] = {} + + def __getitem__(self, key: str) -> FakeKVStoreCollection: + return self._collections[key] + + def __contains__(self, key: str) -> bool: + return key in self._collections + + def create(self, name: str, fields: dict) -> dict[str, Any]: + self._collections[name] = FakeKVStoreCollection() + return {"headers": {}, "reason": "Created", "status": 200, "body": ""} + + +class FakeLogger: + def __init__(self) -> None: + self.messages: List[str] = [] + + def info(self, message: str) -> None: + self.messages.append(f"INFO: {message}") + + def error(self, message: str) -> None: + self.messages.append(f"ERROR: {message}") + + +class FakeFlareAPI: + def __init__(self, api_key: str, tenant_id: int) -> None: + self.api_key = api_key + self.tenant_id = tenant_id + + def fetch_feed_events( + self, + next: Optional[str], + start_date: Optional[datetime], + ingest_metadata_only: bool, + ) -> List[tuple[dict, str]]: + return [({"event": "test_event"}, "next_token")] + + +class FlareEventIngestorTest(unittest.TestCase): + def setUp(self) -> None: + self.logger = FakeLogger() + self.kvstore = FakeKVStoreCollections() + self.storage_passwords = FakeStoragePasswords( + passwords=[ + StoragePassword(username="api_key", clear_password="test_api_key"), + StoragePassword(username="tenant_id", clear_password="123"), + StoragePassword(username="ingest_metadata_only", clear_password="true"), + ] + ) + + def test_run(self) -> None: + ingestor = FlareEventIngestor( + kvstore=self.kvstore, + storage_passwords=self.storage_passwords, + flare_api_cls=FakeFlareAPI, + logger=self.logger, + ) + + ingestor.run() + + self.assertIn("INFO: Fetched 1 events", self.logger.messages) + + test_collection_data = self.kvstore[KV_COLLECTION_NAME].data.query() + self.assertEqual(len(test_collection_data), 3) + self.assertTrue( + any(item["_key"] == "timestamp_last_fetch" for item in test_collection_data) + ) + self.assertTrue( + any(item["_key"] == "start_date" for item in test_collection_data) + ) + self.assertTrue( + any(item["_key"] == "next_123" for item in test_collection_data) + ) + + def test_run_guard(self) -> None: + ingestor = FlareEventIngestor( + kvstore=self.kvstore, + storage_passwords=self.storage_passwords, + flare_api_cls=FakeFlareAPI, + logger=self.logger, + ) + + ingestor.create_collection() + + self.kvstore[KV_COLLECTION_NAME].data.insert( + json.dumps( + { + "_key": CollectionKeys.TIMESTAMP_LAST_FETCH.value, + "value": (datetime.now() - timedelta(minutes=9)).isoformat(), + } + ) + ) + + ingestor.run() + + self.assertIn( + f"INFO: Fetched events less than {int(CRON_JOB_THRESHOLD_SINCE_LAST_FETCH.seconds / 60)} minutes ago, exiting", + self.logger.messages, + ) + + def test_create_collection(self) -> None: + ingestor = FlareEventIngestor( + kvstore=self.kvstore, + storage_passwords=self.storage_passwords, + flare_api_cls=FakeFlareAPI, + logger=self.logger, + ) + ingestor.create_collection() + + # Assert that the collection was created + self.assertIn(KV_COLLECTION_NAME, self.kvstore._collections) + + def test_get_api_key_expect_exception(self) -> None: + ingestor = FlareEventIngestor( + kvstore=self.kvstore, + storage_passwords=FakeStoragePasswords(passwords=[]), + flare_api_cls=FakeFlareAPI, + logger=self.logger, + ) + + with pytest.raises(Exception, match="API key not found"): + ingestor.run() + + def test_get_tenant_id_expect_exception(self) -> None: + ingestor = FlareEventIngestor( + kvstore=self.kvstore, + storage_passwords=FakeStoragePasswords(passwords=[ + StoragePassword(username="api_key", clear_password="test_api_key"), + ]), + flare_api_cls=FakeFlareAPI, + logger=self.logger, + ) + + with pytest.raises(Exception, match="Tenant ID not found"): + ingestor.run() \ No newline at end of file