diff --git a/packages/flare/bin/cron_job_ingest_events.py b/packages/flare/bin/cron_job_ingest_events.py index e50ceb6..02d5dff 100644 --- a/packages/flare/bin/cron_job_ingest_events.py +++ b/packages/flare/bin/cron_job_ingest_events.py @@ -7,6 +7,7 @@ from typing import Any from typing import Iterator from typing import Optional +from typing import Protocol sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor")) @@ -21,14 +22,64 @@ from constants import PasswordKeys from flare import FlareAPI from logger import Logger -from vendor.splunklib.client import Service +from vendor.splunklib.client import Entity -def main(logger: Logger, app: client.Application) -> None: - create_collection(app=app) +class StoragePasswords(Protocol): + def list(self) -> list: + pass + + +class KVStoreCollections(Protocol): + def __getitem__(self, key: str) -> Entity: + pass + + def __contains__(self, item: str) -> bool: + pass + + def create(self, name: str, fields: dict) -> dict: + pass + + +class KVStoreCollectionData(Protocol): + def insert(self, data: str) -> dict: + pass + + def update(self, id: str, data: str) -> dict: + pass + + def query(self, **query: dict) -> list: + pass + + +class Collection(Protocol): + def __getitem__(self, key: str) -> Entity: + pass + + +class Service(Protocol): + @property + def apps(self) -> Collection: + pass + + @property + def storage_passwords(self) -> StoragePasswords: + pass + + @property + def kvstore(self) -> KVStoreCollections: + pass + + +class Application(Protocol): + service: Service + + +def main(logger: Logger, storage_passwords: StoragePasswords, kvstore: KVStoreCollections) -> None: + create_collection(kvstore=kvstore) # To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work - last_fetched_timestamp = get_last_fetched(app) + last_fetched_timestamp = get_last_fetched(kvstore=kvstore) if last_fetched_timestamp and last_fetched_timestamp > ( datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH ): @@ -37,23 +88,23 @@ def main(logger: Logger, app: client.Application) -> None: ) return - api_key = get_api_key(app=app) - tenant_id = get_tenant_id(app=app) - ingest_metadata_only = get_ingest_metadata_only(app=app) + api_key = get_api_key(storage_passwords=storage_passwords) + tenant_id = get_tenant_id(storage_passwords=storage_passwords) + ingest_metadata_only = get_ingest_metadata_only(storage_passwords=storage_passwords) - save_last_fetched(app=app) - save_last_ingested_tenant_id(app=app, tenant_id=tenant_id) + save_last_fetched(kvstore=kvstore) + save_last_ingested_tenant_id(kvstore=kvstore, tenant_id=tenant_id) events_fetched_count = 0 for event, next_token in fetch_feed( logger=logger, - app=app, + kvstore=kvstore, api_key=api_key, tenant_id=tenant_id, ingest_metadata_only=ingest_metadata_only, ): - save_last_fetched(app=app) + save_last_fetched(kvstore=kvstore) - save_next(app=app, tenant_id=tenant_id, next=next_token) + save_next(kvstore=kvstore, tenant_id=tenant_id, next=next_token) print(json.dumps(event), flush=True) @@ -63,27 +114,27 @@ def main(logger: Logger, app: client.Application) -> None: def get_storage_password_value( - app: client.Application, password_key: str + storage_passwords: StoragePasswords, password_key: str ) -> Optional[str]: - for item in app.service.storage_passwords.list(): + for item in storage_passwords.list(): if item.content.username == password_key: return item.clear_password return None -def get_api_key(app: client.Application) -> str: +def get_api_key(storage_passwords: StoragePasswords) -> str: api_key = get_storage_password_value( - app=app, password_key=PasswordKeys.API_KEY.value + storage_passwords=storage_passwords, password_key=PasswordKeys.API_KEY.value ) if not api_key: raise Exception("API key not found") return api_key -def get_tenant_id(app: client.Application) -> int: +def get_tenant_id(storage_passwords: StoragePasswords) -> int: stored_tenant_id = get_storage_password_value( - app=app, password_key=PasswordKeys.TENANT_ID.value + storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_ID.value ) try: tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None @@ -95,23 +146,23 @@ def get_tenant_id(app: client.Application) -> int: return tenant_id -def get_ingest_metadata_only(app: client.Application) -> bool: +def get_ingest_metadata_only(storage_passwords: StoragePasswords) -> bool: return ( get_storage_password_value( - app=app, password_key=PasswordKeys.INGEST_METADATA_ONLY.value + storage_passwords=storage_passwords, password_key=PasswordKeys.INGEST_METADATA_ONLY.value ) == "true" ) -def get_next(app: client.Application, tenant_id: int) -> Optional[str]: +def get_next(kvstore: KVStoreCollections, tenant_id: int) -> Optional[str]: return get_collection_value( - app=app, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" + kvstore=kvstore, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" ) -def get_start_date(app: client.Application) -> Optional[date]: - start_date = get_collection_value(app=app, key=CollectionKeys.START_DATE.value) +def get_start_date(kvstore: KVStoreCollections) -> Optional[date]: + start_date = get_collection_value(kvstore=kvstore, key=CollectionKeys.START_DATE.value) if start_date: try: return date.fromisoformat(start_date) @@ -120,9 +171,9 @@ def get_start_date(app: client.Application) -> Optional[date]: return None -def get_last_ingested_tenant_id(app: client.Application) -> Optional[int]: +def get_last_ingested_tenant_id(kvstore: KVStoreCollections) -> Optional[int]: last_ingested_tenant_id = get_collection_value( - app=app, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value + kvstore=kvstore, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value ) try: return int(last_ingested_tenant_id) if last_ingested_tenant_id else None @@ -131,9 +182,9 @@ def get_last_ingested_tenant_id(app: client.Application) -> Optional[int]: return None -def get_last_fetched(app: client.Application) -> Optional[datetime]: +def get_last_fetched(kvstore: KVStoreCollections) -> Optional[datetime]: timestamp_last_fetched = get_collection_value( - app=app, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value + kvstore=kvstore, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value ) if timestamp_last_fetched: try: @@ -143,55 +194,55 @@ def get_last_fetched(app: client.Application) -> Optional[datetime]: return None -def create_collection(app: client.Application) -> None: - if KV_COLLECTION_NAME not in app.service.kvstore: +def create_collection(kvstore: KVStoreCollections) -> None: + if KV_COLLECTION_NAME not in kvstore: # Create the collection - app.service.kvstore.create( + kvstore.create( name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"} ) -def save_last_ingested_tenant_id(app: client.Application, tenant_id: int) -> None: +def save_last_ingested_tenant_id(kvstore: KVStoreCollections, tenant_id: int) -> None: # If the tenant has changed, update the start date so that future requests will be based off today # If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day # that tenant was switched in the first place. - if get_last_ingested_tenant_id(app=app) != tenant_id: + if get_last_ingested_tenant_id(kvstore=kvstore) != tenant_id: save_collection_value( - app=app, + kvstore=kvstore, key=CollectionKeys.START_DATE.value, value=date.today().isoformat(), ) save_collection_value( - app=app, + kvstore=kvstore, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=tenant_id, ) -def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None: +def save_next(kvstore: KVStoreCollections, tenant_id: int, next: Optional[str]) -> None: # If we have a new next value, update the collection for that tenant to continue searching from that point if not next: return save_collection_value( - app=app, + kvstore=kvstore, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}", value=next, ) -def save_last_fetched(app: client.Application) -> None: +def save_last_fetched(kvstore: KVStoreCollections) -> None: save_collection_value( - app=app, + kvstore=kvstore, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value, value=datetime.now().isoformat(), ) -def get_collection_value(app: client.Application, key: str) -> Optional[str]: - if KV_COLLECTION_NAME in app.service.kvstore: - data = app.service.kvstore[KV_COLLECTION_NAME].data.query() +def get_collection_value(kvstore: KVStoreCollections, key: str) -> Optional[str]: + if KV_COLLECTION_NAME in kvstore: + data = kvstore[KV_COLLECTION_NAME].data.query() for entry in data: if entry["_key"] == key: return entry["value"] @@ -199,9 +250,9 @@ def get_collection_value(app: client.Application, key: str) -> Optional[str]: return None -def save_collection_value(app: client.Application, key: str, value: Any) -> None: - if not get_collection_value(app=app, key=key): - app.service.kvstore[KV_COLLECTION_NAME].data.insert( +def save_collection_value(kvstore: KVStoreCollections, key: str, value: Any) -> None: + if not get_collection_value(kvstore=kvstore, key=key): + kvstore[KV_COLLECTION_NAME].data.insert( json.dumps( { "_key": key, @@ -210,7 +261,7 @@ def save_collection_value(app: client.Application, key: str, value: Any) -> None ) ) else: - app.service.kvstore[KV_COLLECTION_NAME].data.update( + kvstore[KV_COLLECTION_NAME].data.update( id=key, data=json.dumps({"value": value}), ) @@ -218,7 +269,7 @@ def save_collection_value(app: client.Application, key: str, value: Any) -> None def fetch_feed( logger: Logger, - app: client.Application, + kvstore: KVStoreCollections, api_key: str, tenant_id: int, ingest_metadata_only: bool, @@ -226,8 +277,8 @@ def fetch_feed( try: flare_api = FlareAPI(api_key=api_key, tenant_id=tenant_id) - next = get_next(app=app, tenant_id=tenant_id) - start_date = get_start_date(app=app) + next = get_next(kvstore=kvstore, tenant_id=tenant_id) + start_date = get_start_date(kvstore=kvstore) logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}") for event_next in flare_api.fetch_feed_events( next=next, start_date=start_date, ingest_metadata_only=ingest_metadata_only @@ -254,9 +305,11 @@ def get_splunk_service(logger: Logger) -> Service: if __name__ == "__main__": logger = Logger(class_name=__file__) - splunk_service = get_splunk_service(logger=logger) + splunk_service: Service = get_splunk_service(logger=logger) + app: Application = splunk_service.apps[APP_NAME] main( logger=logger, - app=splunk_service.apps[APP_NAME], + storage_passwords=app.service.storage_passwords, + kvstore=app.service.kvstore, ) diff --git a/packages/flare/tests/bin/test_ingest_events.py b/packages/flare/tests/bin/test_ingest_events.py index c836bba..f9a4cb3 100644 --- a/packages/flare/tests/bin/test_ingest_events.py +++ b/packages/flare/tests/bin/test_ingest_events.py @@ -31,29 +31,29 @@ def test_get_collection_value_expect_none() -> None: - app = MagicMock() - assert get_collection_value(app=app, key="some_key") is None + kvstore = MagicMock() + assert get_collection_value(kvstore=kvstore, key="some_key") is None def test_get_collection_value_expect_result() -> None: - app = MagicMock() - app.service.kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME - app.service.kvstore[KV_COLLECTION_NAME].data.query.return_value = [ + kvstore = MagicMock() + kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME + kvstore[KV_COLLECTION_NAME].data.query.return_value = [ { "_key": "some_key", "value": "some_value", }, ] - assert get_collection_value(app=app, key="some_key") == "some_value" + assert get_collection_value(kvstore=kvstore, key="some_key") == "some_value" def test_save_collection_value_expect_insert() -> None: key = "some_key" value = "some_value" - app = MagicMock() - save_collection_value(app=app, key=key, value=value) - app.service.kvstore[KV_COLLECTION_NAME].data.insert.assert_called_once_with( + kvstore = MagicMock() + save_collection_value(kvstore=kvstore, key=key, value=value) + kvstore[KV_COLLECTION_NAME].data.insert.assert_called_once_with( json.dumps({"_key": key, "value": value}) ) @@ -61,33 +61,33 @@ def test_save_collection_value_expect_insert() -> None: def test_save_collection_value_expect_update() -> None: key = "some_key" value = "update_value" - app = MagicMock() - app.service.kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME - app.service.kvstore[KV_COLLECTION_NAME].data.query.return_value = [ + kvstore = MagicMock() + kvstore.__contains__.side_effect = lambda x: x == KV_COLLECTION_NAME + kvstore[KV_COLLECTION_NAME].data.query.return_value = [ { "_key": key, "value": "old_value", }, ] - save_collection_value(app=app, key=key, value=value) - app.service.kvstore[KV_COLLECTION_NAME].data.update.assert_called_once_with( + save_collection_value(kvstore=kvstore, key=key, value=value) + kvstore[KV_COLLECTION_NAME].data.update.assert_called_once_with( id=key, data=json.dumps({"value": value}), ) def test_get_api_key_tenant_id_expect_exception() -> None: - app = MagicMock() + storage_passwords = MagicMock() with pytest.raises(Exception, match="API key not found"): - get_api_key(app=app) + get_api_key(storage_passwords=storage_passwords) with pytest.raises(Exception, match="Tenant ID not found"): - get_tenant_id(app=app) + get_tenant_id(storage_passwords=storage_passwords) def test_get_api_credentials_expect_api_key_and_tenant_id() -> None: - app = MagicMock() + storage_passwords = MagicMock() api_key_item = Mock() type(api_key_item.content).username = PropertyMock(return_value="api_key") @@ -97,11 +97,11 @@ def test_get_api_credentials_expect_api_key_and_tenant_id() -> None: type(tenant_id_item.content).username = PropertyMock(return_value="tenant_id") type(tenant_id_item).clear_password = PropertyMock(return_value=11111) - app.service.storage_passwords.list.return_value = [api_key_item, tenant_id_item] + storage_passwords.list.return_value = [api_key_item, tenant_id_item] - api_key = get_api_key(app=app) + api_key = get_api_key(storage_passwords=storage_passwords) assert api_key == "some_api_key" - tenant_id = get_tenant_id(app=app) + tenant_id = get_tenant_id(storage_passwords=storage_passwords) assert tenant_id == 11111 @@ -109,40 +109,40 @@ def test_get_api_credentials_expect_api_key_and_tenant_id() -> None: "cron_job_ingest_events.get_collection_value", return_value="not_an_isoformat_date" ) def test_get_start_date_expect_none(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert get_start_date(app=app) is None + kvstore = MagicMock() + assert get_start_date(kvstore=kvstore) is None @patch( "cron_job_ingest_events.get_collection_value", return_value=date.today().isoformat() ) def test_get_start_date_expect_date(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert isinstance(get_start_date(app=app), date) + kvstore = MagicMock() + assert isinstance(get_start_date(kvstore), date) @patch("cron_job_ingest_events.get_collection_value", return_value="not_a_number") def test_get_last_ingested_tenant_id_expect_none( get_collection_value_mock: MagicMock, ) -> None: - app = MagicMock() - assert get_last_ingested_tenant_id(app=app) is None + kvstore = MagicMock() + assert get_last_ingested_tenant_id(kvstore=kvstore) is None @patch("cron_job_ingest_events.get_collection_value", return_value="11111") def test_get_last_ingested_tenant_id_expect_integer( get_collection_value_mock: MagicMock, ) -> None: - app = MagicMock() - assert get_last_ingested_tenant_id(app=app) == 11111 + kvstore = MagicMock() + assert get_last_ingested_tenant_id(kvstore=kvstore) == 11111 @patch( "cron_job_ingest_events.get_collection_value", return_value="not_an_isoformat_date" ) def test_get_last_fetched_expect_none(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert get_last_fetched(app=app) is None + kvstore = MagicMock() + assert get_last_fetched(kvstore=kvstore) is None @patch( @@ -150,8 +150,8 @@ def test_get_last_fetched_expect_none(get_collection_value_mock: MagicMock) -> N return_value=datetime.now().isoformat(), ) def test_get_last_fetched_expect_datetime(get_collection_value_mock: MagicMock) -> None: - app = MagicMock() - assert isinstance(get_last_fetched(app=app), datetime) + kvstore = MagicMock() + assert isinstance(get_last_fetched(kvstore=kvstore), datetime) @patch("cron_job_ingest_events.save_collection_value") @@ -160,17 +160,17 @@ def test_save_last_ingested_tenant_id_expect_save_collection_value_called_and_te get_last_ingested_tenant_id_mock: MagicMock, save_collection_value_mock: MagicMock, ) -> None: - app = MagicMock() - save_last_ingested_tenant_id(app=app, tenant_id=11111) + kvstore = MagicMock() + save_last_ingested_tenant_id(kvstore=kvstore, tenant_id=11111) save_collection_value_mock.assert_has_calls( [ call( - app=app, + kvstore=kvstore, key=CollectionKeys.START_DATE.value, value=date.today().isoformat(), ), call( - app=app, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 + kvstore=kvstore, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 ), ] ) @@ -184,17 +184,17 @@ def test_save_last_ingested_tenant_id_expect_save_collection_value_not_called_an get_start_date_mock: MagicMock, save_collection_value_mock: MagicMock, ) -> None: - app = MagicMock() - save_last_ingested_tenant_id(app=app, tenant_id=11111) + kvstore = MagicMock() + save_last_ingested_tenant_id(kvstore=kvstore, tenant_id=11111) save_collection_value_mock.assert_has_calls( [ call( - app=app, + kvstore=kvstore, key=CollectionKeys.START_DATE.value, value=date.today().isoformat(), ), call( - app=app, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 + kvstore=kvstore, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 ), ] ) @@ -208,19 +208,19 @@ def test_save_last_ingested_tenant_id_expect_same_tenant_id( get_start_date_mock: MagicMock, save_collection_value_mock: MagicMock, ) -> None: - app = MagicMock() - save_last_ingested_tenant_id(app=app, tenant_id=11111) + kvstore = MagicMock() + save_last_ingested_tenant_id(kvstore=kvstore, tenant_id=11111) save_collection_value_mock.assert_called_once_with( - app=app, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 + kvstore=kvstore, key=CollectionKeys.LAST_INGESTED_TENANT_ID.value, value=11111 ) def test_fetch_feed_expect_exception() -> None: logger = MagicMock() - app = MagicMock() + kvstore = MagicMock() for _ in fetch_feed( logger=logger, - app=app, + kvstore=kvstore, api_key="some_key", tenant_id=11111, ingest_metadata_only=False, @@ -236,7 +236,7 @@ def test_fetch_feed_expect_feed_response( sleep: Any, flare_api_mock: MagicMock, capfd: Any ) -> None: logger = MagicMock() - app = MagicMock() + kvstore = MagicMock() next = "some_next_value" first_item = { @@ -254,7 +254,7 @@ def test_fetch_feed_expect_feed_response( events: list[dict] = [] for event, next_token in fetch_feed( logger=logger, - app=app, + kvstore=kvstore, api_key="some_key", tenant_id=11111, ingest_metadata_only=False, @@ -272,9 +272,10 @@ def test_fetch_feed_expect_feed_response( ) def test_main_expect_early_return(get_last_fetched_mock: MagicMock) -> None: logger = MagicMock() - app = MagicMock() + storage_passwords = MagicMock() + kvstore = MagicMock() - main(logger=logger, app=app) + main(logger=logger, storage_passwords=storage_passwords, kvstore=kvstore) logger.info.assert_called_once_with( f"Fetched events less than {int(CRON_JOB_THRESHOLD_SINCE_LAST_FETCH.seconds / 60)} minutes ago, exiting" ) @@ -305,12 +306,13 @@ def test_main_expect_normal_run( fetch_feed_mock: MagicMock, ) -> None: logger = MagicMock() - app = MagicMock() + storage_passwords = MagicMock() + kvstore = MagicMock() - main(logger=logger, app=app) + main(logger=logger, storage_passwords=storage_passwords, kvstore=kvstore) fetch_feed_mock.assert_called_once_with( logger=logger, - app=app, + kvstore=kvstore, api_key="some_api_key", tenant_id=111, ingest_metadata_only=False,