Skip to content

Commit

Permalink
Adds Splunk interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Kasaboski committed Nov 19, 2024
1 parent 0985cda commit cf026dc
Showing 1 changed file with 108 additions and 50 deletions.
158 changes: 108 additions & 50 deletions packages/flare/bin/cron_job_ingest_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any
from typing import Iterator
from typing import Optional
from typing import Protocol


sys.path.insert(0, os.path.join(os.path.dirname(__file__), "vendor"))
Expand All @@ -21,14 +22,71 @@
from constants import PasswordKeys
from flare import FlareAPI
from logger import Logger
from vendor.splunklib.client import Service
from vendor.splunklib.client import Entity


def main(logger: Logger, app: client.Application) -> None:
create_collection(app=app)
class Indexes(Protocol):
def create(self, name: str, **params: dict) -> Entity:
pass


class StoragePasswords(Protocol):
def list(self) -> list:
pass


class KVStoreCollections(Protocol):
def __getitem__(self, key: str) -> Entity:
pass

def __contains__(self, item: str) -> bool:
pass

def create(self, name: str, fields: dict) -> dict:
pass


class KVStoreCollectionsData(Protocol):
def insert(self, data: str) -> dict:
pass

def update(self, id: str, data: str) -> dict:
pass

class Collection(Protocol):
def __getitem__(self, key: str) -> Entity:
pass

class Service(Protocol):
@property
def apps(self) -> Collection:
pass

@property
def indexes(self) -> Indexes:
pass

@property
def storage_passwords(self) -> StoragePasswords:
pass

@property
def kvstore(self) -> KVStoreCollections:
pass


class Application(Protocol):
service: Service


def main(logger: Logger, app: Application) -> None:
kvstore = app.service.kvstore
storage_passwords = app.service.storage_passwords

create_collection(kvstore=kvstore)

# To avoid cron jobs from doing the same work at the same time, exit new cron jobs if a cron job is already doing work
last_fetched_timestamp = get_last_fetched(app)
last_fetched_timestamp = get_last_fetched(kvstore=kvstore)
if last_fetched_timestamp and last_fetched_timestamp > (
datetime.now() - CRON_JOB_THRESHOLD_SINCE_LAST_FETCH
):
Expand All @@ -37,23 +95,23 @@ def main(logger: Logger, app: client.Application) -> None:
)
return

api_key = get_api_key(app=app)
tenant_id = get_tenant_id(app=app)
ingest_metadata_only = get_ingest_metadata_only(app=app)
api_key = get_api_key(storage_passwords=storage_passwords)
tenant_id = get_tenant_id(storage_passwords=storage_passwords)
ingest_metadata_only = get_ingest_metadata_only(storage_passwords=storage_passwords)

save_last_fetched(app=app)
save_last_fetched(kvstore=kvstore)
events_fetchd_count = 0
for event, next_token in fetch_feed(
logger=logger,
app=app,
kvstore=kvstore,
api_key=api_key,
tenant_id=tenant_id,
ingest_metadata_only=ingest_metadata_only,
):
save_last_fetched(app=app)
save_last_fetched(kvstore=kvstore)

save_start_date(app=app, tenant_id=tenant_id)
save_next(app=app, tenant_id=tenant_id, next=next_token)
save_start_date(kvstore=kvstore, tenant_id=tenant_id)
save_next(kvstore=kvstore, tenant_id=tenant_id, next=next_token)

print(json.dumps(event), flush=True)

Expand All @@ -63,27 +121,27 @@ def main(logger: Logger, app: client.Application) -> None:


def get_storage_password_value(
app: client.Application, password_key: str
storage_passwords: StoragePasswords, password_key: str
) -> Optional[str]:
for item in app.service.storage_passwords.list():
for item in storage_passwords.list():
if item.content.username == password_key:
return item.clear_password

return None


def get_api_key(app: client.Application) -> str:
def get_api_key(storage_passwords: StoragePasswords) -> str:
api_key = get_storage_password_value(
app=app, password_key=PasswordKeys.API_KEY.value
storage_passwords=storage_passwords, password_key=PasswordKeys.API_KEY.value
)
if not api_key:
raise Exception("API key not found")
return api_key


def get_tenant_id(app: client.Application) -> int:
def get_tenant_id(storage_passwords: StoragePasswords) -> int:
stored_tenant_id = get_storage_password_value(
app=app, password_key=PasswordKeys.TENANT_ID.value
storage_passwords=storage_passwords, password_key=PasswordKeys.TENANT_ID.value
)
try:
tenant_id = int(stored_tenant_id) if stored_tenant_id is not None else None
Expand All @@ -95,23 +153,23 @@ def get_tenant_id(app: client.Application) -> int:
return tenant_id


def get_ingest_metadata_only(app: client.Application) -> bool:
def get_ingest_metadata_only(storage_passwords: StoragePasswords) -> bool:
return (
get_storage_password_value(
app=app, password_key=PasswordKeys.INGEST_METADATA_ONLY.value
storage_passwords=storage_passwords, password_key=PasswordKeys.INGEST_METADATA_ONLY.value
)
== "true"
)


def get_next(app: client.Application, tenant_id: int) -> Optional[str]:
def get_next(kvstore: KVStoreCollections, tenant_id: int) -> Optional[str]:
return get_collection_value(
app=app, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}"
kvstore=kvstore, key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}"
)


def get_start_date(app: client.Application) -> Optional[date]:
start_date = get_collection_value(app=app, key=CollectionKeys.START_DATE.value)
def get_start_date(kvstore: KVStoreCollections) -> Optional[date]:
start_date = get_collection_value(kvstore=kvstore, key=CollectionKeys.START_DATE.value)
if start_date:
try:
return date.fromisoformat(start_date)
Expand All @@ -120,9 +178,9 @@ def get_start_date(app: client.Application) -> Optional[date]:
return None


def get_current_tenant_id(app: client.Application) -> Optional[int]:
def get_current_tenant_id(kvstore: KVStoreCollections) -> Optional[int]:
current_tenant_id = get_collection_value(
app=app, key=CollectionKeys.CURRENT_TENANT_ID.value
kvstore=kvstore, key=CollectionKeys.CURRENT_TENANT_ID.value
)
try:
return int(current_tenant_id) if current_tenant_id else None
Expand All @@ -131,9 +189,9 @@ def get_current_tenant_id(app: client.Application) -> Optional[int]:
return None


def get_last_fetched(app: client.Application) -> Optional[datetime]:
def get_last_fetched(kvstore: KVStoreCollections) -> Optional[datetime]:
timestamp_last_fetched = get_collection_value(
app=app, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value
kvstore=kvstore, key=CollectionKeys.TIMESTAMP_LAST_FETCH.value
)
if timestamp_last_fetched:
try:
Expand All @@ -143,20 +201,20 @@ def get_last_fetched(app: client.Application) -> Optional[datetime]:
return None


def create_collection(app: client.Application) -> None:
if KV_COLLECTION_NAME not in app.service.kvstore:
def create_collection(kvstore: KVStoreCollections) -> None:
if KV_COLLECTION_NAME not in kvstore:
# Create the collection
app.service.kvstore.create(
kvstore.create(
name=KV_COLLECTION_NAME, fields={"_key": "string", "value": "string"}
)


def save_start_date(app: client.Application, tenant_id: int) -> None:
current_tenant_id = get_current_tenant_id(app=app)
def save_start_date(kvstore: KVStoreCollections, tenant_id: int) -> None:
current_tenant_id = get_current_tenant_id(kvstore=kvstore)
# If this is the first request ever, insert today's date so that future requests will be based on that
if not get_start_date(app):
if not get_start_date(kvstore=kvstore):
save_collection_value(
app=app,
kvstore=kvstore,
key=CollectionKeys.START_DATE.value,
value=date.today().isoformat(),
)
Expand All @@ -165,45 +223,45 @@ def save_start_date(app: client.Application, tenant_id: int) -> None:
# If you switch tenants, this will avoid the old tenant from ingesting all the events before today and the day
# that tenant was switched in the first place.
if current_tenant_id != tenant_id:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
kvstore[KV_COLLECTION_NAME].data.update(
id=CollectionKeys.START_DATE.value,
data=json.dumps({"value": date.today().isoformat()}),
)


def save_next(app: client.Application, tenant_id: int, next: Optional[str]) -> None:
def save_next(kvstore: KVStoreCollections, tenant_id: int, next: Optional[str]) -> None:
# If we have a new next value, update the collection for that tenant to continue searching from that point
if not next:
return

save_collection_value(
app=app,
kvstore=kvstore,
key=f"{CollectionKeys.get_next_token(tenantId=tenant_id)}",
value=next,
)


def save_last_fetched(app: client.Application) -> None:
def save_last_fetched(kvstore: KVStoreCollections) -> None:
save_collection_value(
app=app,
kvstore=kvstore,
key=CollectionKeys.TIMESTAMP_LAST_FETCH.value,
value=datetime.now().isoformat(),
)


def get_collection_value(app: client.Application, key: str) -> Optional[str]:
if KV_COLLECTION_NAME in app.service.kvstore:
data = app.service.kvstore[KV_COLLECTION_NAME].data.query()
def get_collection_value(kvstore: KVStoreCollections, key: str) -> Optional[str]:
if KV_COLLECTION_NAME in kvstore:
data = kvstore[KV_COLLECTION_NAME].data.query()
for entry in data:
if entry["_key"] == key:
return entry["value"]

return None


def save_collection_value(app: client.Application, key: str, value: Any) -> None:
if not get_collection_value(app=app, key=key):
app.service.kvstore[KV_COLLECTION_NAME].data.insert(
def save_collection_value(kvstore: KVStoreCollections, key: str, value: Any) -> None:
if not get_collection_value(kvstore=kvstore, key=key):
kvstore[KV_COLLECTION_NAME].data.insert(
json.dumps(
{
"_key": key,
Expand All @@ -212,24 +270,24 @@ def save_collection_value(app: client.Application, key: str, value: Any) -> None
)
)
else:
app.service.kvstore[KV_COLLECTION_NAME].data.update(
kvstore[KV_COLLECTION_NAME].data.update(
id=key,
data=json.dumps({"value": value}),
)


def fetch_feed(
logger: Logger,
app: client.Application,
kvstore: KVStoreCollections,
api_key: str,
tenant_id: int,
ingest_metadata_only: bool,
) -> Iterator[tuple[dict, str]]:
try:
flare_api = FlareAPI(api_key=api_key, tenant_id=tenant_id)

next = get_next(app=app, tenant_id=tenant_id)
start_date = get_start_date(app=app)
next = get_next(kvstore=kvstore, tenant_id=tenant_id)
start_date = get_start_date(kvstore=kvstore)
logger.info(f"Fetching {tenant_id=}, {next=}, {start_date=}")
for event_next in flare_api.fetch_feed_events(
next=next, start_date=start_date, ingest_metadata_only=ingest_metadata_only
Expand Down

0 comments on commit cf026dc

Please sign in to comment.