From 670638a22899e24cffe723f889877fdf8e159eb4 Mon Sep 17 00:00:00 2001 From: Mark Kasaboski Date: Mon, 18 Nov 2024 17:32:02 -0500 Subject: [PATCH] Adds Splunk interfaces --- packages/flare/bin/cron_job_ingest_events.py | 155 +++++++++++++------ 1 file changed, 105 insertions(+), 50 deletions(-) diff --git a/packages/flare/bin/cron_job_ingest_events.py b/packages/flare/bin/cron_job_ingest_events.py index 700e5db..29a0c51 100644 --- a/packages/flare/bin/cron_job_ingest_events.py +++ b/packages/flare/bin/cron_job_ingest_events.py @@ -7,6 +7,7 @@ from typing import Any from typing import Iterator from typing import Optional +from typing import Protocol sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor")) @@ -21,14 +22,68 @@ from constants import PasswordKeys from flare import FlareAPI from logger import Logger -from vendor.splunklib.client import Service +from vendor.splunklib.client import Entity -def main(logger: Logger, app: client.Application) -> None: - create_collection(app=app) +class Indexes(Protocol): + def create(self, name: str, **params: dict) -> Entity: + pass + + +class StoragePasswords(Protocol): + def list(self) -> list: + pass + + +class KVStoreCollections(Protocol): + def __contains__(self, item: str) -> bool: + pass + + def create(self, name: str, fields: dict) -> dict: + pass + + +class KVStoreCollectionsData(Protocol): + def insert(self, data: str) -> dict: + pass + + def update(self, id: str, data: str) -> dict: + pass + +class Collection(Protocol): + def __getitem__(self, key: str) -> Entity: + pass + +class Service(Protocol): + @property + def apps(self) -> Collection: + pass + + @property + def indexes(self) -> Indexes: + pass + + @property + def storage_passwords(self) -> StoragePasswords: + pass + + @property + def kvstore(self) -> KVStoreCollections: + pass + + +class Application(Protocol): + service: Service + + +def main(logger: Logger, app: Application) -> None: + kvstore = app.service.kvstore + storage_passwords = app.service.storage_passwords + + create_collection(kvstore=kvstore) # To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work - last_fetched_timestamp = get_last_fetched(app) + last_fetched_timestamp = get_last_fetched(kvstore=kvstore) if last_fetched_timestamp and last_fetched_timestamp > ( datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH ): @@ -37,23 +92,23 @@ def main(logger: Logger, app: client.Application) -> None: ) return - api_key = get_api_key(app=app) - tenant_id = get_tenant_id(app=app) - ingest_metadata_only = get_ingest_metadata_only(app=app) + api_key = get_api_key(storage_passwords=storage_passwords) + tenant_id = get_tenant_id(storage_passwords=storage_passwords) + ingest_metadata_only = get_ingest_metadata_only(storage_passwords=storage_passwords) - save_last_fetched(app=app) + save_last_fetched(kvstore=kvstore) events_fetchd_count = 0 for event, next_token in fetch_feed( logger=logger, - app=app, + kvstore=kvstore, api_key=api_key, tenant_id=tenant_id, ingest_metadata_only=ingest_metadata_only, ): - save_last_fetched(app=app) + save_last_fetched(kvstore=kvstore) - save_start_date(app=app, tenant_id=tenant_id) - save_next(app=app, tenant_id=tenant_id, next=next_token) + save_start_date(kvstore=kvstore, tenant_id=tenant_id) + save_next(kvstore=kvstore, tenant_id=tenant_id, next=next_token) print(json.dumps(event), flush=True) @@ -63,27 +118,27 @@ def main(logger: Logger, app: client.Application) -> None: def get_storage_password_value( - app: client.Application, password_key: str + storage_passwords: StoragePasswords, password_key: str ) -> Optional[str]: - for item in app.service.storage_passwords.list(): + for item in storage_passwords.list(): if item.content.username == password_key: return item.clear_password return None -def get_api_key(app: client.Application) -> str: +def get_api_key(storage_passwords: StoragePasswords) -> str: api_key = get_storage_password_value( - app=app, password_key=PasswordKeys.API_KEY.value + storage_passwords=storage_passwords, password_key=PasswordKeys.API_KEY.value ) if not api_key: raise Exception("API key not found") return api_key -def get_tenant_id(app: client.Application) -> int: +def get_tenant_id(storage_passwords: StoragePasswords) -> int: stored_tenant_id = get_storage_password_value( - app=app, password_key=PasswordKeys.TENANT_ID.value + storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_ID.value ) try: tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None @@ -95,23 +150,23 @@ def get_tenant_id(app: client.Application) -> int: return tenant_id -def get_ingest_metadata_only(app: client.Application) -> bool: +def get_ingest_metadata_only(storage_passwords: StoragePasswords) -> bool: return ( get_storage_password_value( - app=app, password_key=PasswordKeys.INGEST_METADATA_ONLY.value + storage_passwords=storage_passwords, password_key=PasswordKeys.INGEST_METADATA_ONLY.value ) == "true" ) -def get_next(app: client.Application, tenant_id: int) -> Optional[str]: +def get_next(kvstore: KVStoreCollections, tenant_id: int) -> Optional[str]: return get_collection_value( - app=app, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" + kvstore=kvstore, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}" ) -def get_start_date(app: client.Application) -> Optional[date]: - start_date = get_collection_value(app=app, key=CollectionKeys.START_DATE.value) +def get_start_date(kvstore: KVStoreCollections) -> Optional[date]: + start_date = get_collection_value(kvstore=kvstore, key=CollectionKeys.START_DATE.value) if start_date: try: return date.fromisoformat(start_date) @@ -120,9 +175,9 @@ def get_start_date(app: client.Application) -> Optional[date]: return None -def get_current_tenant_id(app: client.Application) -> Optional[int]: +def get_current_tenant_id(kvstore: KVStoreCollections) -> Optional[int]: current_tenant_id = get_collection_value( - app=app, key=CollectionKeys.CURRENT_TENANT_ID.value + kvstore=kvstore, key=CollectionKeys.CURRENT_TENANT_ID.value ) try: return int(current_tenant_id) if current_tenant_id else None @@ -131,9 +186,9 @@ def get_current_tenant_id(app: client.Application) -> Optional[int]: return None -def get_last_fetched(app: client.Application) -> Optional[datetime]: +def get_last_fetched(kvstore: KVStoreCollections) -> Optional[datetime]: timestamp_last_fetched = get_collection_value( - app=app, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value + kvstore=kvstore, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value ) if timestamp_last_fetched: try: @@ -143,20 +198,20 @@ def get_last_fetched(app: client.Application) -> Optional[datetime]: return None -def create_collection(app: client.Application) -> None: - if KV_COLLECTION_NAME not in app.service.kvstore: +def create_collection(kvstore: KVStoreCollections) -> None: + if KV_COLLECTION_NAME not in kvstore: # Create the collection - app.service.kvstore.create( + kvstore.create( name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"} ) -def save_start_date(app: client.Application, tenant_id: int) -> None: - current_tenant_id = get_current_tenant_id(app=app) +def save_start_date(kvstore: KVStoreCollections, tenant_id: int) -> None: + current_tenant_id = get_current_tenant_id(kvstore=kvstore) # If this is the first request ever, insert today's date so that future requests will be based on that - if not get_start_date(app): + if not get_start_date(kvstore=kvstore): save_collection_value( - app=app, + kvstore=kvstore, key=CollectionKeys.START_DATE.value, value=date.today().isoformat(), ) @@ -165,35 +220,35 @@ def save_start_date(app: client.Application, tenant_id: int) -> None: # If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day # that tenant was switched in the first place. if current_tenant_id != tenant_id: - app.service.kvstore[KV_COLLECTION_NAME].data.update( + kvstore[KV_COLLECTION_NAME].data.update( id=CollectionKeys.START_DATE.value, data=json.dumps({"value": date.today().isoformat()}), ) -def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None: +def save_next(kvstore: KVStoreCollections, tenant_id: int, next: Optional[str]) -> None: # If we have a new next value, update the collection for that tenant to continue searching from that point if not next: return save_collection_value( - app=app, + kvstore=kvstore, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}", value=next, ) -def save_last_fetched(app: client.Application) -> None: +def save_last_fetched(kvstore: KVStoreCollections) -> None: save_collection_value( - app=app, + kvstore=kvstore, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value, value=datetime.now().isoformat(), ) -def get_collection_value(app: client.Application, key: str) -> Optional[str]: - if KV_COLLECTION_NAME in app.service.kvstore: - data = app.service.kvstore[KV_COLLECTION_NAME].data.query() +def get_collection_value(kvstore: KVStoreCollections, key: str) -> Optional[str]: + if KV_COLLECTION_NAME in kvstore: + data = kvstore[KV_COLLECTION_NAME].data.query() for entry in data: if entry["_key"] == key: return entry["value"] @@ -201,9 +256,9 @@ def get_collection_value(app: client.Application, key: str) -> Optional[str]: return None -def save_collection_value(app: client.Application, key: str, value: Any) -> None: - if not get_collection_value(app=app, key=key): - app.service.kvstore[KV_COLLECTION_NAME].data.insert( +def save_collection_value(kvstore: KVStoreCollections, key: str, value: Any) -> None: + if not get_collection_value(kvstore=kvstore, key=key): + kvstore[KV_COLLECTION_NAME].data.insert( json.dumps( { "_key": key, @@ -212,7 +267,7 @@ def save_collection_value(app: client.Application, key: str, value: Any) -> None ) ) else: - app.service.kvstore[KV_COLLECTION_NAME].data.update( + kvstore[KV_COLLECTION_NAME].data.update( id=key, data=json.dumps({"value": value}), ) @@ -220,7 +275,7 @@ def save_collection_value(app: client.Application, key: str, value: Any) -> None def fetch_feed( logger: Logger, - app: client.Application, + kvstore: KVStoreCollections, api_key: str, tenant_id: int, ingest_metadata_only: bool, @@ -228,8 +283,8 @@ def fetch_feed( try: flare_api = FlareAPI(api_key=api_key, tenant_id=tenant_id) - next = get_next(app=app, tenant_id=tenant_id) - start_date = get_start_date(app=app) + next = get_next(kvstore=kvstore, tenant_id=tenant_id) + start_date = get_start_date(kvstore=kvstore) logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}") for event_next in flare_api.fetch_feed_events( next=next, start_date=start_date, ingest_metadata_only=ingest_metadata_only