Skip to content

Commit

Permalink
Merge pull request #244 from amosproj/demo
Browse files Browse the repository at this point in the history
Demo
  • Loading branch information
ultiwinter authored Feb 7, 2024
2 parents 7ae6adf + 2b9f289 commit 4a6ca72
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 192 deletions.
2 changes: 1 addition & 1 deletion src/bdc/steps/google_places.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def get_first_place_candidate(self, query, input_type) -> (dict, int):
return None, 0

if not response["status"] == HTTPStatus.OK.name:
log.warning(
log.debug(
f"Failed to fetch data. Status code: {response['status']}",
)
return None, 0
Expand Down
5 changes: 2 additions & 3 deletions src/bdc/steps/helpers/generate_hash_leads.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pandas as pd

from bdc.steps.step import Step
from database import get_database
from logger import get_logger

Expand Down Expand Up @@ -56,12 +55,12 @@ def hash_check(

if lead_hash in lookup_table:
# If the hash exists in the lookup table, return the corresponding data
log.info(f"Hash {lead_hash} already exists in the lookup table.")
log.debug(f"Hash {lead_hash} already exists in the lookup table.")
try:
previous_data = lead_data[fields_tofill]
return previous_data
except KeyError as e:
log.info(
log.debug(
f"Hash is present but data fields {fields_tofill} were not found."
)
lookup_table[lead_hash] = lookup_table[lead_hash][:-1] + [
Expand Down
30 changes: 25 additions & 5 deletions src/database/leads/local_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ class LocalRepository(Repository):
DF_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/leads_enriched.csv")
)
DF_HISTORICAL_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/100k_historic_enriched.csv")
)
DF_PREPROCESSED_INPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/preprocessed_data_files/")
)
DF_PREDICTION_OUTPUT = os.path.abspath(
os.path.join(BASE_PATH, "../../data/leads_predicted_size.csv")
)
REVIEWS = os.path.abspath(os.path.join(BASE_PATH, "../../data/reviews/"))
SNAPSHOTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/snapshots/"))
GPT_RESULTS = os.path.abspath(os.path.join(BASE_PATH, "../../data/gpt-results/"))
Expand All @@ -51,6 +57,13 @@ def save_dataframe(self):
self.df.to_csv(self.DF_OUTPUT, index=False)
log.info(f"Saved enriched data locally to {self.DF_OUTPUT}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
df.to_csv(self.DF_PREDICTION_OUTPUT, index=False)
log.info(f"Saved prediction result locally to {self.DF_PREDICTION_OUTPUT}")

def insert_data(self, data):
"""
TODO: Insert new data into specified dataframe
Expand All @@ -68,7 +81,7 @@ def save_review(self, review, place_id, force_refresh=False):
json_file_path = os.path.join(self.REVIEWS, file_name)

if os.path.exists(json_file_path):
log.info(f"Reviews for {place_id} already exist")
log.debug(f"Reviews for {place_id} already exist")
return

with open(json_file_path, "w", encoding="utf-8") as json_file:
Expand Down Expand Up @@ -253,10 +266,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report at {report_file_path}! Error: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = os.path.join(self.DF_PREPROCESSED_INPUT, file_name)
return file_path

def load_preprocessed_data(self, historical: bool = True):
try:
return pd.read_csv(os.path.join(self.DF_PREPROCESSED_INPUT, file_name))
return pd.read_csv(self.get_preprocessed_data_path(historical))
except FileNotFoundError:
log.error("Error: Could not find input file for preprocessed data.")
29 changes: 27 additions & 2 deletions src/database/leads/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@ def DF_INPUT(self):
pass

@property
@abstractmethod
def DF_OUTPUT(self):
"""
Define database path to store dataframe
"""
pass

@property
@abstractmethod
def DF_HISTORICAL_OUTPUT(self):
"""
Define database path to store historical enriched dataframe (used for preprocessing input)
"""
pass

@property
@abstractmethod
def REVIEWS(self):
Expand Down Expand Up @@ -65,7 +74,9 @@ def set_dataframe(self, df):
def get_input_path(self):
return self.DF_INPUT

def get_output_path(self):
def get_enriched_data_path(self, historical=False):
if historical:
return self.DF_HISTORICAL_OUTPUT
return self.DF_OUTPUT

@abstractmethod
Expand All @@ -82,6 +93,13 @@ def save_dataframe(self):
"""
pass

@abstractmethod
def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
pass

@abstractmethod
def insert_data(self, data):
"""
Expand Down Expand Up @@ -221,7 +239,14 @@ def save_classification_report(self, report, model_name: str):
pass

@abstractmethod
def load_preprocessed_data(self, file_name: str):
def get_preprocessed_data_path(self, historical: bool = True):
"""
Returns the path for a preprocessed data file (either historical or current)
"""
pass

@abstractmethod
def load_preprocessed_data(self, historical: bool = True):
"""
Load the preprocessed data from the given file
"""
Expand Down
32 changes: 24 additions & 8 deletions src/database/leads/s3_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class S3Repository(Repository):
MODELS_BUCKET = "amos--models"
DF_INPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/enriched.csv"
DF_HISTORICAL_OUTPUT = (
f"s3://{EVENTS_BUCKET}/historical_data/100k_historic_enriched.csv"
)
DF_PREDICTION_OUTPUT = f"s3://{EVENTS_BUCKET}/leads/leads_predicted_size.csv"
DF_PREPROCESSED_INPUT = f"s3://{FEATURES_BUCKET}/preprocessed_data_files/"
REVIEWS = f"s3://{EVENTS_BUCKET}/reviews/"
SNAPSHOTS = f"s3://{EVENTS_BUCKET}/snapshots/"
Expand Down Expand Up @@ -131,6 +135,16 @@ def save_dataframe(self):
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved enriched leads to s3://{bucket}/{obj_key}")

def save_prediction(self, df):
"""
Save dataframe in df parameter in chosen output location
"""
bucket, obj_key = decode_s3_url(self.DF_PREDICTION_OUTPUT)
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
self._save_to_s3(csv_buffer.getvalue(), bucket, obj_key)
log.info(f"Successfully saved prediction result to s3://{bucket}/{obj_key}")

def _save_to_s3(self, data, bucket, key):
s3.put_object(
Bucket=bucket,
Expand Down Expand Up @@ -374,15 +388,17 @@ def save_classification_report(self, report, model_name: str):
except Exception as e:
log.error(f"Could not save report for '{model_name}' to S3: {str(e)}")

def load_preprocessed_data(
self, file_name: str = "historical_preprocessed_data.csv"
):
def get_preprocessed_data_path(self, historical: bool = True):
file_name = (
"historical_preprocessed_data.csv"
if historical
else "preprocessed_data.csv"
)
file_path = self.DF_PREPROCESSED_INPUT + file_name
if not file_path.startswith("s3://"):
log.error(
"S3 location has to be defined like this: s3://<BUCKET>/<OBJECT_KEY>"
)
return
return file_path

def load_preprocessed_data(self, historical: bool = True):
file_path = self.get_preprocessed_data_path(historical)

source = None
remote_dataset = None
Expand Down
Loading

0 comments on commit 4a6ca72

Please sign in to comment.