Skip to content

Commit

Permalink
BITMAKER-2695 Store job stats in a single collection per project (#167)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Raymond Negron <[email protected]>
Co-authored-by: emegona <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2023
1 parent d14d7b3 commit afa0e02
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
12 changes: 11 additions & 1 deletion database_adapters/db_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ def get_connection(self):
def delete_collection_data(self, database_name, collection_name):
collection = self.client[database_name][collection_name]
count = collection.delete_many({}).deleted_count
return count if collection.drop() else 0
try:
collection.drop()
except PyMongoError as ex:
print(ex)
return count

def get_all_collection_data(self, database_name, collection_name):
collection = self.client[database_name][collection_name]
Expand All @@ -110,6 +114,12 @@ def get_chunked_collection_data(
del item["_id"]
return data, next_chunk

def get_job_stats(self, database_name, collection_name):
result = self.client[database_name]["job_stats"].find(
{"_id": collection_name}, {"_id": False}
)
return list(result)

def get_paginated_collection_data(
self, database_name, collection_name, page, page_size
):
Expand Down
11 changes: 8 additions & 3 deletions estela-api/api/views/job_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ def list(self, request, *args, **kwargs):
response["next_chunk"] = next_chunk
return Response(response)

result = spiderdata_db_client.get_paginated_collection_data(
kwargs["pid"], job_collection_name, page, page_size
)
if data_type == "stats":
result = spiderdata_db_client.get_job_stats(
kwargs["pid"], job_collection_name
)
else:
result = spiderdata_db_client.get_paginated_collection_data(
kwargs["pid"], job_collection_name, page, page_size
)
return Response(
{
"count": count,
Expand Down
23 changes: 18 additions & 5 deletions queueing/inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def __init__(self, client, database_name, collection_name, unique, topic):

logging.info("New Inserter created for {}.".format(self.identifier))

def is_job_stats(self, collection_name):
return "job_stats" == collection_name.split("-")[2]

def __handle_insertion_error(self, response, items):
logging.warning(
"The exception [{}] occurred during the insertion of {} items in {}.".format(
Expand All @@ -47,11 +50,20 @@ def __handle_insertion_error(self, response, items):
producer.send(self.topic, item)

def __insert_items(self, reason):
response = self.__client.insert_many_to_collection(
self.database_name,
self.collection_name,
[item["payload"] for item in self.__items],
)
if self.is_job_stats(self.collection_name):
self.__items[0]["payload"]["_id"] = self.collection_name
response = self.__client.insert_one_to_collection(
self.database_name,
"job_stats",
self.__items[0]["payload"],
)
else:
response = self.__client.insert_many_to_collection(
self.database_name,
self.collection_name,
[item["payload"] for item in self.__items],
)

if response.ok:
logging.info(
"{} documents inserted [{}] in {}.".format(
Expand All @@ -60,6 +72,7 @@ def __insert_items(self, reason):
)
else:
self.__handle_insertion_error(response, self.__items)

del self.__items[:]

def is_inactive(self):
Expand Down

0 comments on commit afa0e02

Please sign in to comment.