diff --git a/Example/helpers/big_query_class.py b/Example/helpers/big_query_class.py new file mode 100644 index 0000000..071ae26 --- /dev/null +++ b/Example/helpers/big_query_class.py @@ -0,0 +1,145 @@ +"""modules to import""" +import os +import re +import datetime as DT +import pandas as pd +from google.cloud import bigquery +from google.api_core.exceptions import BadRequest + +class BigQueryClass: + """bigquery class to insert data""" + def __init__(self, dataset): + self.project_id = os.getenv("PROJECT_ID") + self.big_query_client = bigquery.Client(project=self.project_id) + self.dataset = dataset + + def raw_table(self, table_name:str) -> str: + """define the raw bigquery table""" + temp_data = f"{self.project_id}.{self.dataset}.{table_name}" + return temp_data + + def temp_table(self, table_name:str) -> str: + """return tempoary table name""" + #uuid_id = uuid.uuid4() + timestamp = str(DT.datetime.now()) + timestamp = timestamp.replace(":","-") + timestamp = timestamp.replace(".","_") + timestamp = timestamp.replace(" ","__") + temp_data = f"{self.project_id}.{self.dataset}.{table_name}_{timestamp}" + return temp_data + + def create_table(self + , raw_data: str + , temp_data: str + ) -> None: + """create table""" + query = f""" + CREATE TABLE `{temp_data}` + LIKE `{raw_data}` + OPTIONS( + expiration_timestamp = TIMESTAMP_ADD(current_timestamp(), INTERVAL 7 DAY) + );""" + print(query) + query_job = self.big_query_client.query(query) + try: + return query_job.result() + except BadRequest as error: + print(error) + return None + + def load_dataframe_into_table( + self + , dataframe:pd.DataFrame + , temp_data:str + , job_config:bigquery.LoadJobConfig=None + ): + """load dataframe into table""" + print("load df into temp table") + job = self.big_query_client.load_table_from_dataframe( + dataframe + , temp_data + , job_config=job_config + ) + return job.result() # Waits for the job to complete. + + def get_table_schema(self,temp_data:str): + """when all goes wrong print the schema""" + table = self.big_query_client.get_table( + table=temp_data + ) + schema = table.schema + print(schema) + + def truncate_table(self, raw_data:str) ->None: + """truncates table""" + query = f"""TRUNCATE TABLE `{raw_data}`""" + print(query) + query_job = self.big_query_client.query(query) + return query_job.result() # Waits for the job to complete. + + def delete_from_table(self,temp_data:str, raw_data:str, columns:list) ->None: + """deletes data from raw_data table using temp_data""" + delete_query = f"""DELETE FROM `{raw_data}` + WHERE """ + for column in columns: + if columns.index(column)>0: + delete_query +=" AND " + delete_query +=" {column} in (select distinct {column} from `{temp_data}`)\n".format( + column = column + ,temp_data = temp_data + ) + print(delete_query) + query_job = self.big_query_client.query(delete_query) + return query_job.result() # Waits for the job to complete. + + def get_columns_from_table(self, table:str) ->list: + """using information schema to get column names from table""" + split_text = re.split( + pattern="\." + ,string=table + ) + dataset = split_text[1] + table_name = split_text[2] + #query + query = f"""SELECT COLUMN_NAME + FROM `{self.project_id}`.{dataset}.INFORMATION_SCHEMA.COLUMNS + WHERE table_name = '{table_name}';""" + print(query) + query_job = self.big_query_client.query(query) + response = query_job.result() + columns=[] + for row in response: + columns.append(row.COLUMN_NAME) + return columns + + def insert_from_table(self, temp_data:str, raw_data:str, columns:list) -> None: + """insert data from temp_data into raw_data table""" + print("insert data from temporary_data into raw_data") + insert_query = f"INSERT `{raw_data}` (" + for column in columns: + if columns.index(column)>0: + insert_query +="," + insert_query+=f"{column}\n" + insert_query +=")\nSELECT \n" + for column in columns: + if columns.index(column)>0: + insert_query +="," + insert_query+=f"{column}\n" + insert_query += f"FROM `{temp_data}`; " + + print(insert_query) + query_job = self.big_query_client.query(insert_query) + return query_job.result() # Waits for the job to complete. + + def drop_temp_table(self,temp_table:str): + """drop tempoary table""" + query = f"DROP TABLE IF EXISTS `{temp_table}`;" + print(query) + query_job = self.big_query_client.query(query) + return query_job.result() + +if __name__ == '__main__': + bigquery_class = BigQueryClass("Barb_Data") + bigquery_class.get_columns_from_table( + 'phd-solutions-platform-dev.Barb_Data.advertising_spots' + ) diff --git a/Example/helpers/pybarb.py b/Example/helpers/pybarb.py new file mode 100644 index 0000000..63f664b --- /dev/null +++ b/Example/helpers/pybarb.py @@ -0,0 +1,1548 @@ +"""modules to import""" +import re +import os +import time +import tempfile +import json +import requests +import pandas as pd +from helpers.secret_manager import GcpSecretManager +from google.cloud import storage + +storage_client= storage.Client(project=os.getenv("PROJECT")) + +class BarbAPI: + """ + Represents the Barb API. + + Attributes: + api_key (str): The API key for accessing the Barb API. + api_root (str): The root URL of the Barb API. + connected (bool): Whether the Barb API is connected. + headers (dict): The headers for the Barb API. + current_job_id (str): The current job id for the Barb API. + + Methods: + connect: Connects to the Barb API. + get_station_code: Gets the station code for a given station name. + get_viewing_station_code: Gets the station code for a given station name. + get_panel_code: Gets the panel code for a given panel region. + programme_ratings: Gets the programme ratings for a given date range. + advertising_spots: Gets the advertising spots for a given date range. + spot_audience: gets the spot audience report data by day and panel + audiences_by_time: Gets the audiences by time for a given date range. + list_stations: Lists the stations available in the API. + list_viewing_stations: Lists the stations available in the API. + list_panels: Lists the panels available in the API. + list_buyers: Lists the buyers available in the API. + query_asynch_endpoint: Queries the asynch endpoint. + get_asynch_file_urls: Gets the asynch file urls. + get_asynch_files: Gets the asynch files. + ping_job_status: Pings the job status. + """ + + def __init__(self, api_root:str="https://barb-api.co.uk/api/v1/"): + """ + Initializes a new instance of the BarbAPI class. + + Args: + api_key (dict): contains "email" and "password". + api_root (str): The root URL of the Barb API. + + """ + self.api_root = api_root + self.connected = False + self.headers = {} + self.headers["Authorization"]=self.connect() + self.current_job_id = None + + def connect(self): + """ + Connects to the Barb API. + """ + + try: + # Code to connect to the Barb API + self.connected = True + #get secrets + if os.getenv("ENV"): + email = os.getenv("EMAIL") + password = os.getenv("PASSWORD") + else: + secret_manager = GcpSecretManager() + email = secret_manager.get_secret("barb_email") + password = secret_manager.get_secret("barb_password") + + # Code to get an access token from the Barb API + token_request_url = self.api_root + "auth/token/" + response = requests.post( + token_request_url + , data={ + "email":email, + "password":password + } + ,timeout = 300 + ) + print(response.text) + access_token = json.loads(response.text)["access"] + return f"Bearer {access_token}" + + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + self.connected = False + except json.JSONDecodeError: + print("Failed to decode the response from the Barb API.") + self.connected = False + except KeyError: + print("Failed to get access token from the response.") + self.connected = False + + def get_viewing_station_code(self, viewing_station_name:str) ->dict: + """ + Gets the viewing_station code for a given viewing_station name. + + Args: + viewing_station_name (str): The name of the viewing_station to query. + + Returns: + str: The viewing_station code. + """ + + api_url = f"{self.api_root}viewing_stations/" + r = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() + api_data = r.json() + viewing_station_code = [ + s["viewing_station_code"] + for s in api_data + if viewing_station_name.lower() == s["viewing_station_name"].lower() + ] + if len(viewing_station_code) == 1: + viewing_station_code = viewing_station_code[0] + else: + raise KeyError(f"Viewing station name {viewing_station_name} not found.") + return viewing_station_code + + def programme_ratings( + self, + min_date, + max_date, + station_code:str, + panel_code=None, + consolidated=True, + last_updated_greater_than=None, + use_reporting_days=True, + limit=5000, + ): + """ + Gets the programme ratings for a given date range. + + Args: + min_transmission_date (str): The minimum transmission date to query. + max_transmission_date (str): The maximum transmission date to query. + station (str): The name of the station to query. + panel (str): The name of the panel to query. + consolidated (bool): Whether to return consolidated data. + last_updated_greater_than (str): The last updated date to query. + use_reporting_days (bool): Whether to use reporting days. + limit (int): The maximum number of results to return. + + Returns: + ProgrammeRatingsResultSet: The programme ratings result set. + """ + + # The query parameters + params = { + "min_transmission_date": min_date, + "max_transmission_date": max_date, + "station_code": station_code, + "panel_code": panel_code, + "consolidated": consolidated, + "last_updated_greater_than": last_updated_greater_than, + "use_reporting_days": use_reporting_days, + "limit": limit, + } + + api_response_data = self.query_event_endpoint("programme_ratings", params) + + return ProgrammeRatingsResultSet(api_response_data) + + def programme_schedule( + self, + min_date:str, + max_date:str, + station_code:str, + last_updated_greater_than:str=None, + ): + """ + Gets the programme ratings for a given date range. + + Args: + min_scheduled_date (str): The minimum scheduled date to query. + max_scheduled_date (str): The maximum scheduled date to query. + station (str): The name of the station to query. + last_updated_greater_than (str): The last updated date to query. + + Returns: + ProgrammeScheduleResultSet: The programme ratings result set. + """ + + # The query parameters + params = { + "min_scheduled_date": min_date, + "max_scheduled_date": max_date, + "station_code": station_code, + "last_updated_greater_than": last_updated_greater_than, + } + + api_response_data = self.query_bulk_endpoint("programme_schedule", params) + + return ProgrammeScheduleResultSet(api_response_data) + + def programme_audience( + self, + min_date, + max_date, + panel_code:int, + ): + """ + Gets the programme audience for a given date range. + + Args: + min_sesssion_date (str): The minimum transmission date to query. + max_sesssion_date (str): The maximum transmission date to query. + panel_code (str): The panel code to query. + + Returns: + ProgrammeAudienceResultSet: The programme audience result set. + """ + + # The query parameters + params = { + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code": panel_code, + } + + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/programme_audience", + parameters=params, + method="GET" + ) + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + print(string) + # raise Warning(string) + return None + + return GoogleBucketResultSet(api_response_data, endpoint="programme_audience") + + def advertising_spots( + self, + min_transmission_date:str, + max_transmission_date:str, + station_code=None, + panel_code=None, + advertiser=None, + buyer=None, + consolidated=True, + standardise_audiences=None, + use_reporting_days=True, + last_updated_greater_than=None, + limit=5000, + ): + """ + Gets the advertising spots for a given date range. + + Args: + min_transmission_date (str): The minimum transmission date to query. + max_transmission_date (str): The maximum transmission date to query. + station_code (str): The code of the station to query. + panel_code (str): The code of the panel to query. + advertiser (str): The name of the advertiser to query. + buyer (str): The name of the buyer to query. + consolidated (bool): Whether to return consolidated data. + standardise_audiences (bool): Whether to standardise audiences. + use_reporting_days (bool): Whether to use reporting days. + last_updated_greater_than (str): The last updated date to query. + limit (int): The maximum number of results to return. + + Returns: + AdvertisingSpotsResultSet: The advertising spots result set. + """ + + # The query parameters + params = { + "min_transmission_date": min_transmission_date, + "max_transmission_date": max_transmission_date, + "station_code": station_code, + "panel_code":panel_code, + "advertiser_name": advertiser, + "buyer_name": buyer, + "consolidated": consolidated, + "standardise_audiences": standardise_audiences, + "use_reporting_days": use_reporting_days, + "last_updated_greater_than": last_updated_greater_than, + "limit": limit, + } + api_response_data = self.query_event_endpoint("advertising_spots", params) + + return AdvertisingSpotsResultSet(api_response_data) + + def spot_audience(self, + min_date:str, + max_date:str, + panel_code): + """ + Gets the advertising spots for a given date range. + + Args: + min_transmission_date (str): The minimum transmission date to query. + max_transmission_date (str): The maximum transmission date to query. + panel_code (str): The code of the panel to query. + + Returns: + SpotAudienceResultSet: The advertising spots result set. + """ + + # The query parameters + params = { + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code":panel_code, + } + + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/spot_audience", + parameters=params, + method="GET" + ) + print("api_response_data", api_response_data) + + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + #raise Warning(string) + return None + return GoogleBucketResultSet(api_response_data,endpoint="spot_audience") + + def spot_schedule(self + ,min_date:str + ,max_date:str + ,station_code:int + ,last_updated_gt:str=None): + """ + request the spot schedule endpoint + """ + # The query parameters + params = { + "min_scheduled_date": min_date, + "max_scheduled_date": max_date, + "station_code": station_code, + } + if last_updated_gt: + params["last_updated_greater_than"]= last_updated_gt + api_response_data = self.query_bulk_endpoint("spot_schedule", params) + return SpotScheduleResultSet(api_response_data) + + def audiences_by_time( + self, + min_transmission_date, + max_transmission_date, + time_period_length, + viewing_status, + station_code=None, + panel_code=None, + use_polling_days=True, + last_updated_greater_than=None, + limit=5000, + ): + """ + Gets the audiences by time for a given date range. + + Args: + min_transmission_date (str): The minimum transmission date to query. + max_transmission_date (str): The maximum transmission date to query. + time_period_length (str): The time period length to query. + viewing_status (str): The viewing status to query. + + station (str): The name of the station to query. + panel (str): The name of the panel to query. + use_polling_days (bool): Whether to use polling days. + last_updated_greater_than (str): The last updated date to query. + limit (int): The maximum number of results to return. + + Returns: + AudiencesByTimeResultSet: The audiences by time result set. + """ + + # The query parameters + params = { + "min_transmission_date": min_transmission_date, + "max_transmission_date": max_transmission_date, + "station_code": station_code, + "panel_code": panel_code, + "time_period_length": time_period_length, + "viewing_status": viewing_status, + "use_polling_days": use_polling_days, + "last_updated_greater_than": last_updated_greater_than, + "limit": limit, + } + + api_response_data = self.query_event_endpoint("audiences_by_time", params) + + return AudiencesByTimeResultSet(api_response_data) + + def viewing( + self, + min_date:str, + max_date:str, + panel_code:str, + #viewing_station=None, + #activity_type=None, + #last_updated_greater_than=None, + #output_format="parquet", + #limit=5000, + ): + """ + Gets the viewing for a given date range. + + Args: + min_date (str): The minimum session date to query. + max_date (str): The maximum session date to query. + viewing_station (str): The name of the viewing_station to query. + panel (str): The name of the panel to query. + activity_type (str): The activity type to query. + last_updated_greater_than (str): The last updated date to query. + output_format (str): The output format to query. + limit (int): The maximum number of results to return. + + Returns: + ViewingResultSet: The viewing result set. + """ + + # The query parameters + params = { + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code": panel_code, + # "viewing_station_code": None + # if viewing_station is None + # else self.get_viewing_station_code(viewing_station), + # "output_format": output_format, + # "limit": limit, + } + + # if activity_type is not None: + # params["activity_type"] = activity_type + + # if last_updated_greater_than is not None: + # params["last_updated_greater_than"] = last_updated_greater_than + + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/viewing/", + parameters=params, + method="GET" + ) + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + if os.getenv("ENV"): + print(string) + else: + raise Warning(string) + return None + return GoogleBucketResultSet(api_response_data,endpoint="viewing") + + def query_event_endpoint(self, endpoint, parameters,method="GET"): + """ + Queries the event endpoint. + Args: + endpoint (str): The endpoint to query. + parameters (dict): The query parameters. + + Returns: + dict: The API response data. + """ + api_response_data = {"endpoint": endpoint, "events": []} + try: + api_url = f"{self.api_root}{endpoint}" + r = requests.request( + url=api_url + , params=parameters + , headers=self.headers + , timeout=300 + , method=method + ) + r.raise_for_status() + + # If the response is not 200 then raise an exception + if r.status_code != 200: + raise requests.HTTPError(f"Error: {r.status_code} - {r.text}") + + r_json = r.json() + #print(r_json) + + # If events is not in the response then raise an exception + if "events" not in r_json.keys(): + raise KeyError(f"Error: {r_json['message']}") + + # If events is empty then raise an exception + if len(r_json["events"]) == 0: + return api_response_data + + api_response_data = {"endpoint": endpoint, "events": r_json["events"]} + count=0 + while "X-Next" in r.headers: + x_next_url = r.headers["X-Next"] + r = requests.get( + url=x_next_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() + r_json = r.json() + api_response_data["events"] = ( + api_response_data["events"] + r_json["events"] + ) + print(count) + count+=1 + + return api_response_data + + except requests.exceptions.RequestException as e: + raise requests.exceptions.RequestException(f"An error occurred: {e}") + except json.JSONDecodeError: + print("Failed to decode the response.") + return api_response_data + + def list_stations(self, regex_filter=None): + """ + Lists the stations available in the API. + + Returns: + list: The stations result set. + """ + + api_url = f"{self.api_root}stations" + try: + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + api_response_data.raise_for_status() + list_of_stations = [] + for x in api_response_data.json(): + list_of_stations.append(x) + + if len(list_of_stations) == 0: + raise requests.RequestException("Error: No stations returned.") + + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_stations = list(filter(regex.search, list_of_stations)) + + return list_of_stations + except requests.RequestException as req_error: + print(api_response_data.content) + raise requests.RequestException("error") from req_error + + def list_viewing_stations(self, regex_filter=None): + """ + Lists the stations available in the API. + + Returns: + list: The stations result set. + """ + + api_url = f"{self.api_root}viewing_stations" + + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + + list_of_stations = list(api_response_data.json()) + + if len(list_of_stations) == 0: + raise requests.RequestException("Error: No stations returned.") + + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_stations = list(filter(regex.search, list_of_stations)) + + return list_of_stations + + def list_panels(self, regex_filter=None) ->[list,None]: + """ + Lists the panels available in the API. + + Returns: + list: The panels result set. + """ + + api_url = f"{self.api_root}panels" + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + + list_of_panels = list(api_response_data.json()) + + if len(list_of_panels) == 0: + raise requests.RequestException("Error: No panels returned.") + + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_panels = list(filter(regex.search, list_of_panels)) + + return list_of_panels + + def list_buyers(self, regex_filter=None): + """ + Lists the buyers available in the API. + + Returns: + list: The buyers result set. + """ + + api_url = f"{self.api_root}buyers" + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + + list_of_buyers = api_response_data.json() + + if len(list_of_buyers) == 0: + raise requests.RequestException("Error: No buyers returned.") + + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_buyers = list(filter(regex.search, list_of_buyers)) + + return list_of_buyers + + def list_advertisers(self, regex_filter=None): + """ + Lists the advertisers available in the API. + + Returns: + list: The advertisers result set. + """ + + api_url = f"{self.api_root}advertisers" + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + + list_of_advertisers = [a["advertiser_name"] for a in api_response_data.json()] + + if len(list_of_advertisers) == 0: + raise requests.RequestException("Error: No advertisers returned.") + + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_advertisers = list(filter(regex.search, list_of_advertisers)) + + return list_of_advertisers + + def query_asynch_endpoint(self, endpoint, parameters,method="POST"): + """ + Queries the asynch endpoint. + + Args: + endpoint (str): The endpoint to query. + parameters (dict): The query parameters. + + Returns: + dict: The API response data. + """ + + api_url = f"{self.api_root}{endpoint}" + + # Query the API and turn the response into json + try: + r = requests.request( + url=api_url + , json=parameters + , headers=self.headers + ,timeout=300 + ,method=method + ) + r.raise_for_status() + r_json = r.json() + print(r_json) + self.current_job_id = r_json["job_id"] + return r_json + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + except json.JSONDecodeError: + print("Failed to decode the response.") + return None + + def get_asynch_file_urls(self, job_id=None): + """ + Gets the asynch file urls. + + Args: + job_id (str): The job id to query. + + Returns: + list: The asynch file urls. + """ + + if job_id is None: + job_id = self.current_job_id + + try: + api_url = f"{self.api_root}async-batch/results/{job_id}" + r = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() + r_json = r.json() + if r_json["status"] == "started": + return False + urls = [x["data"] for x in r_json["result"]] + if len(urls) == 0: + raise IndexError("Error: No urls returned.") + return urls + except requests.exceptions.RequestException as e: + print(f"An error occurred: {e}") + except json.JSONDecodeError: + print("Failed to decode the response.") + return None + + def get_asynch_files(self): + """ + Gets the asynch files. + + Returns: + ViewingResultSet: The viewing result set. + """ + + try: + results = pd.DataFrame() + for file in self.current_file_urls: + df = pd.read_parquet(file) + results = pd.concat([results, df]) + return ViewingResultSet(results) + except pd.errors.DataError as error: + print("Failed to get the asynch files.") + print(error) + return None + + def ping_job_status(self, job_id=None): + """ + Pings the job status. + + Args: + job_id (str): The job id to query. + + Returns: + list: The asynch file urls. + """ + + if job_id is None: + job_id = self.current_job_id + + while self.get_asynch_file_urls(job_id) is False: + print("Job not ready yet. Sleeping for 60 seconds.") + time.sleep(60) + + self.current_file_urls = self.get_asynch_file_urls(job_id) + + print( + f"Job complete. {len(self.current_file_urls)} files are ready for download." + ) + + def query_bulk_endpoint(self, endpoint:str, parameters:dict,method="GET")->list[dict]: + """ + Queries the asynch endpoint. + + Args: + endpoint (str): The endpoint to query. + parameters (dict): The query parameters. + + Returns: + dict: The API response data. + """ + + api_url = f"{self.api_root}{endpoint}" + data_list=[] + next_token=True + # Query the API and turn the response into json + try: + while next_token: + r = requests.request( + url = api_url + , params = parameters + , headers = self.headers + ,timeout = 300 + ,method = method + ) + r.raise_for_status() + r_json = r.json() + #print("json output:",r_json) + #append data + if isinstance(r_json, list): + for item in r_json: + data_list.append(item) + elif isinstance(r_json,dict): + data_list.append(item) + else: + raise TypeError("Wrong data type for",r_json) + #continue? + if "X-Next" in list(r.headers.keys()): + api_url = r.headers["X-Next"] + else: + next_token=False + return data_list + except requests.exceptions.RequestException as e: + print(r.content) + print(r.url) + raise Warning(f"An error occurred: {e}") from e + except json.JSONDecodeError: + print("Failed to decode the response.") + return None + +class APIResultSet: + """ + Represents a result set from the Barb API. + """ + + def __init__(self, api_response_data: dict): + """ + Initialises a new instance of the APIResultSet class. + + Args: + api_response_data (dict): The API response data. + """ + + self.api_response_data = api_response_data + +class BulkResultSet: + """ + Respresents the bulk result set from the Barb API + """ + def __init__(self, api_response_data: list[dict]): + """ + Initialises a new instance of the BulkResultSet class. + + Args: + api_response_data (list[dict]): The API response data. + """ + + self.api_response_data = api_response_data + +class ProgrammeRatingsResultSet(APIResultSet): + """ + Represents a programme ratings result set from the Barb API. + """ + + def to_dataframe(self) ->pd.DataFrame: + """ + Converts the API response data into a pandas dataframe. + + Returns: + pandas.DataFrame: A dataframe containing the API response data. + + """ + + # if len(self.api_response_data["events"]) == 0: + # raise Warning("Error: No events returned.") + + # Loop through the events and then the audiences within the events + df_data = [] + for e in self.api_response_data["events"]: + try: + e:dict + df_data.append( + { + "panel_code": e.get("panel",{}).get("panel_code"), + "panel_region": e.get("panel",{}).get("panel_region"), + "is_macro_region": e.get("panel",{}).get("is_macro_region"), + "station_code": e.get("station",{}).get("station_code"), + "station_name": e.get("station",{}).get("station_name"), + "prog_name": e.get("transmission_log_programme_name"), + "programme_type": e.get("programme_type"), + "programme_start_datetime": e.get("programme_start_datetime",{}).get( + "standard_datetime" + ), + "programme_duration_minutes": e.get("programme_duration"), + "spans_normal_day": e.get("spans_normal_day"), + "sponsor_code": e.get("sponsor",{}).get("sponsor_code"), + "bumpers_included": e.get("sponsor",{}).get("bumpers_included"), + "broadcaster_transmission_code": e.get("broadcaster_transmission_code"), + "live_status": e.get("live_status"), + "uk_premiere": e.get("uk_premier"), + "broadcaster_premiere": e.get("broadcaster_premier"), + "programme_repeat": e.get("repeat"), + "episode_name": None if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("episode",{}).get( + "episode_name" + ), + "episode_number": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("episode",{}).get( + "episode_number" + ), + "series_number": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "series_number" + ), + "number_of_episodes": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "number_of_episodes" + ), + "broadcaster_series_id": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "broadcaster_series_id" + ), + "genre": None if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("genre"), + "platforms": e.get("platforms",[]), + "audience_views": e.get("audience_views",[{}]), + } + ) + except AttributeError as error: + print(e) + raise AttributeError(error) from error + except KeyError as error: + print(e) + raise KeyError(error) from error + # Convert the result into a data frame + columns_dict={ + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "station_code": "string", + "station_name": "string", + "prog_name": "string", + "programme_type": "string", + "programme_start_datetime": "datetime64[ns]", + "programme_duration_minutes": "int64", + "spans_normal_day": "bool", + "sponsor_code": "string", + "bumpers_included": "bool", + "broadcaster_transmission_code": "string", + "live_status": "string", + "uk_premiere": "bool", + "broadcaster_premiere": "bool", + "programme_repeat": "bool", + "episode_name": "string", + "episode_number": "string", + "series_number": "string", + "number_of_episodes": "string", + "broadcaster_series_id": "string", + "genre": "string", + "platforms": "object", + "audience_views": "object", + #"date_of_transmission": "datetime64[ns]" + } + #verify dtypes + if len(df_data)>0: + verifry_class = VerifyDtypes() + df_data = verifry_class.verify_dtypes( + data = df_data + ,column_dtypes = columns_dict + ) + df = pd.DataFrame(df_data, columns=list(columns_dict.keys())) + + if not df.empty: + # Format the transmission_time_period as a pandas datetime + df["programme_start_datetime"] = pd.to_datetime(df["programme_start_datetime"]) + df["date_of_transmission"] = df["programme_start_datetime"].dt.date + #dtypes for each column + df=df.astype(dtype=columns_dict) + + return df + +class ProgrammeScheduleResultSet(BulkResultSet): + """ + Represents a programme ratings result set from the Barb API. + """ + + def to_dataframe(self) ->pd.DataFrame: + """ + Converts the API response data into a pandas dataframe. + + Returns: + pandas.DataFrame: A dataframe containing the API response data. + + """ + + # if len(self.api_response_data["events"]) == 0: + # raise Warning("Error: No events returned.") + + # Loop through the events and then the audiences within the events + df_data = [] + + try: + for item in self.api_response_data: + item:dict + for e in item.get("station_schedule",{}): + e:dict + df_data.append( + { + "scheduled_date": item.get("scheduled_date"), + "station_code": item.get("station",{}).get( + "station_code" + ), + "station_name": item.get("station",{}).get( + "station_name" + ), + "panel_code": item.get("panel",{}).get("panel_code"), + "panel_region": item.get("panel",{}).get("panel_region"), + "is_macro_region": item.get("panel",{}).get( + "is_macro_region" + ), + "broadcaster_premier": e.get("broadcaster_premier"), + "broadcaster_transmission_code": e.get("broadcaster_transmission_code"), + "live_status": e.get("live_status"), + "platforms": e.get("platforms",[]), + "content_name": e.get("programme_content").get("content_name"), + "barb_content_id": e.get("programme_content").get("barb_content_id"), + "broadcaster_content_id": e.get("programme_content").get( + "broadcaster_content_id"), + "metabroadcast_content_id": e.get("programme_content").get( + "metabroadcast_information").get("metabroadcast_content_id"), + "episode_number": e.get("programme_content").get( + "episode").get("episode_number"), + "episode_name": e.get("programme_content").get( + "episode").get("episode_name"), + "series_number": e.get("programme_content").get( + "series").get("series_number"), + "number_of_episodes": e.get("programme_content").get( + "series").get("number_of_episodes"), + "broadcaster_series_id": e.get("programme_content").get( + "series").get("broadcaster_series_id"), + "genre": e.get("programme_content").get("genre"), + "programme_duration": e.get("programme_duration"), + "barb_reporting_datetime": e.get("programme_start_datetime").get( + "barb_reporting_datetime"), + "barb_polling_datetime": e.get("programme_start_datetime").get( + "barb_polling_datetime"), + "standard_datetime": e.get("programme_start_datetime").get( + "standard_datetime"), + "programme_type": e.get("programme_type"), + "repeat": e.get("repeat"), + "spans_normal_day": e.get("spans_normal_day"), + "sponsor_code": e.get("sponsor").get("sponsor_code"), + "bumpers_included": e.get("sponsor").get("bumpers_included"), + "transmission_log_programme_name": e.get("transmission_log_programme_name"), + "uk_premier": e.get("uk_premier") + } + ) + except KeyError as error: + print(error) + # Convert the result into a data frame + columns_dict={ + "scheduled_date": "datetime64[ns]", + "station_code": "string", + "station_name": "string", + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "broadcaster_premier": "bool", + "broadcaster_transmission_code": "string", + "live_status": "string", + "platforms": "object", + "content_name": "string", + "barb_content_id": "string", + "broadcaster_content_id": "string", + "metabroadcast_content_id": "string", + "episode_number": "int64", + "episode_name": "string", + "series_number": "string", + "number_of_episodes": "int64", + "broadcaster_series_id": "string", + "genre": "string", + "programme_duration": "int64", + "barb_reporting_datetime": "string", + "barb_polling_datetime": "string", + "standard_datetime": "datetime64[ns]", + "programme_type": "string", + "repeat": "bool", + "spans_normal_day": "bool", + "sponsor_code": "string", + "bumpers_included": "bool", + "transmission_log_programme_name": "string", + "uk_premier":"bool" + } + #verify dtypes + if len(df_data)>0: + verifry_class = VerifyDtypes() + verifry_class.verify_dtypes( + data=df_data + ,column_dtypes=columns_dict + ) + df = pd.DataFrame(df_data, columns=list(columns_dict.keys())) + + if not df.empty: + #dtypes for each column + df = df.astype(dtype=columns_dict) + + return df + +class AdvertisingSpotsResultSet(APIResultSet): + """ + Represents an advertising spots result set from the Barb API. + """ + + def to_dataframe(self) ->pd.DataFrame: + """ + Converts the API response data into a pandas dataframe. + + Returns: + pandas.DataFrame: A dataframe containing the API response data. + + """ + + #if len(self.api_response_data["events"]) == 0: + # raise Exception("Error: No events returned.") + + try: + # Loop through the events and then the audiences within the events + spot_data = [] + for e in self.api_response_data.get("events",[{}]): + dict_event = dict(e) + spot_data.append( + { + "panel_region": e["panel"]["panel_region"], + "panel_code": dict_event["panel"]["panel_code"], + "station_name": dict_event["station"]["station_name"], + "station_code": dict_event["station"]["station_code"], + "spot_type": dict_event["spot_type"], + "spot_start_datetime": dict_event.get("spot_start_datetime",{}).get( + "standard_datetime"), + "spot_duration": dict_event["spot_duration"], + "preceding_programme_name": dict_event["preceding_programme_name"], + "succeeding_programme_name": dict_event["succeeding_programme_name"], + "break_type": dict_event["break_type"], + "position_in_break": dict_event["position_in_break"], + "broadcaster_spot_number": dict_event["broadcaster_spot_number"], + "commercial_number": dict_event["commercial_number"], + "clearcast_commercial_title": dict_event.get( + "clearcast_information",{}).get( + "clearcast_commercial_title",None), + "clearcast_match_group_code": dict_event.get( + "clearcast_information",{}).get( + "match_group_code",None), + "clearcast_match_group_name": dict_event.get( + "clearcast_information",{}).get( + "match_group_name",None), + "clearcast_buyer_code": dict_event.get("clearcast_information",{}).get( + "buyer_code",None), + "clearcast_buyer_name": dict_event.get("clearcast_information",{}).get( + "buyer_name",None), + "clearcast_advertiser_code": dict_event.get("clearcast_information",{}).get( + "advertiser_code",None), + "clearcast_advertiser_name": dict_event.get("clearcast_information",{}).get( + "advertiser_name",None), + "campaign_approval_id": dict_event["campaign_approval_id"], + "sales_house_name": dict_event.get("sales_house",{}).get( + "sales_house_name"), + "audience_views": dict_event.get("audience_views",[{}]), + } + ) + # Convert the result into a data frame + columns=[ + "panel_region", + "panel_code", + "station_name", + "station_code", + "spot_type", + "spot_start_datetime", + "spot_duration", + "preceding_programme_name", + "succeeding_programme_name", + "break_type", + "position_in_break", + "broadcaster_spot_number", + "commercial_number", + "clearcast_commercial_title", + "clearcast_match_group_code", + "clearcast_match_group_name", + "clearcast_buyer_code", + "clearcast_buyer_name", + "clearcast_advertiser_code", + "clearcast_advertiser_name", + "campaign_approval_id", + "sales_house_name", + "audience_views" + ] + spot_data = pd.DataFrame(data=spot_data, columns=columns) + + # Format the transmission_time_period as a pandas datetime + spot_data["spot_start_datetime"] = pd.to_datetime( + spot_data["spot_start_datetime"] + ) + spot_data["date_of_transmission"] = spot_data["spot_start_datetime"].dt.date + + #set dtypes + if not spot_data.empty: + spot_data=self.dataframe_set_dtypes(spot_data) + return spot_data + except pd.errors.DataError as error: + print(error) + return None + + def dataframe_set_dtypes(self,dataframe:pd.DataFrame): + """sets the dtypes for the columns""" + dtypes_dict={ + "panel_region":"string", + "panel_code":"int64", + "station_name":"string", + "station_code":"int64", + "spot_type":"string", + #"spot_start_datetime":"datetime64[ns]", + "spot_duration":"int64", + "preceding_programme_name":"string", + "succeeding_programme_name":"string", + "break_type":"string", + "position_in_break":"string", + "broadcaster_spot_number":"string", + "commercial_number":"string", + "clearcast_commercial_title":"string", + "clearcast_match_group_code":"string", + "clearcast_match_group_name":"string", + "clearcast_buyer_code":"string", + "clearcast_buyer_name":"string", + "clearcast_advertiser_code":"string", + "clearcast_advertiser_name":"string", + "campaign_approval_id":"string", + "sales_house_name":"string", + #"audience_views":"object", + #"date_of_transmission":"datetime64[ns]" + } + dataframe=dataframe.astype(dtype=dtypes_dict) + return dataframe + +class AudiencesByTimeResultSet(APIResultSet): + """ + Represents an audiences by time result set from the Barb API. + """ + + def to_dataframe(self) -> pd.DataFrame: + """ + Converts the API response data into a pandas dataframe. + + Returns: + pandas.DataFrame: A dataframe containing the API response data. + + """ + + # if len(self.api_response_data.get("events",[]) == 0: + # raise Exception("Error: No events returned.") + + try: + # Loop through the events and then the audiences within the events + audience_data = [] + for e in self.api_response_data.get("events",[]): + e:dict + for v in e.get("audience_views",[]): + v:dict + audience_data.append( + { + "date_of_transmission": e.get("date_of_transmission"), + "panel_code": e.get("panel",{}).get("panel_code"), + "panel_region": e.get("panel",{}).get("panel_region"), + "is_macro_region": e.get("panel",{}).get("is_macro_region"), + "station_code": e.get("station",{}).get("station_code"), + "station_name": e.get("station",{}).get("station_name"), + "activity": e.get("activity"), + "transmission_time_period_duration_mins": e.get( + "transmission_time_period_duration_mins"), + "transmission_time_period_start": e.get( + "transmission_time_period_start",{} + ).get("standard_datetime"), + "platforms": e.get("platforms"), + "audience_code": v.get("audience_code"), + "audience_size_hundreds": v.get("audience_size_hundreds"), + "category_id": v.get("category_id"), + "audience_name": v.get("description"), + "audience_target_size_hundreds": v.get( + "target_size_in_hundreds" + ), + } + ) + # Convert the result into a data frame + columns_dict={ + "date_of_transmission": "datetime64[ns]", + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "station_code": "string", + "station_name": "string", + "activity": "string", + "transmission_time_period_duration_mins": "int64", + "transmission_time_period_start": "datetime64[ns]", + "platforms": "object", + "audience_code": "string", + "audience_size_hundreds": "int64", + "category_id": "string", + "audience_name": "string", + "audience_target_size_hundreds": "int64", + } + #verify dtypes + if len(audience_data)>0: + verifry_class = VerifyDtypes() + audience_data = verifry_class.verify_dtypes( + data = audience_data + ,column_dtypes = columns_dict + ) + audience_data = pd.DataFrame(audience_data, columns=list(columns_dict.keys())) + + # Format the transmission_time_period as a pandas datetime + if not audience_data.empty: + audience_data["transmission_time_period_start"] = pd.to_datetime( + audience_data["transmission_time_period_start"] + ) + audience_data = audience_data.astype(dtype=columns_dict) + + return audience_data + except pd.errors.DataError as error: + print(error) + return None + +class GoogleBucketResultSet(BulkResultSet): + """Represents spot Audience result set from the Barb API""" + + def __init__(self, api_response_data: list[dict], endpoint:str): + """initalise class""" + super().__init__(api_response_data) + self.endpoint =endpoint + + def download_parquet(self) -> None: + """download the parquet""" + print("response_data",self.api_response_data) + for results_item in self.api_response_data: + print("results_item",results_item) + date = results_item.get("session_date") + panel = results_item.get("panel_code") + for i,link in enumerate(results_item.get("results",[])): + print(link) + with tempfile.TemporaryDirectory() as td: + r=requests.get(link,timeout=300) + r.raise_for_status() + print(r.headers) + with open(f"{td}/demo_{i}.parquet",mode="wb") as f: + f.write(r.content) + print(f.name) + self.parquet_to_google_bucket( + file=f"{f.name}" + ,date=date + ,panel_code=panel + ) + return "ok" + + def parquet_to_google_bucket(self, file:str, date:str, panel_code:int) -> None: + """upload the parquet to the google bucket""" + bucket_name = os.getenv("BUCKET") + bucket = storage_client.get_bucket(bucket_name) + bucket = storage_client.bucket(bucket_name) + result = re.search("(demo_)[0-9]*.parquet$",file) + file_name= result[0] + blob = bucket.blob(f"{self.endpoint}/date={date}/panel_code={panel_code}/{file_name}") + blob.upload_from_filename(filename=file) + print("file uploaded") + return "ok" + +class SpotScheduleResultSet(BulkResultSet): + """ + return dataframe + """ + def to_dataframe(self) ->pd.DataFrame: + """ + Converts the API response data into a pandas dataframe. + + Returns: + pandas.DataFrame: A dataframe containing the API response data. + + """ + + #if len(self.api_response_data["events"]) == 0: + # raise Exception("Error: No events returned.") + + try: + # Loop through the events and then the audiences within the events + spot_data = [] + #print(self.api_response_data) + for e in self.api_response_data: + for item in e.get("spot_schedule",[{}]): + item:dict + platforms=[] + #print("platform type:",type(item.get("platforms",[]))) + for platfrom in item.get("platforms",[]): + platforms.append(str(platfrom)) + spot_data.append( + { + "scheduled_date":e.get("scheduled_date"), + "station_code":str(e.get("station",{}).get("station_code",{})), + "station_name":str(e.get("station",{}).get("station_name",{})), + "panel_code":str(e.get("panel",{}).get("panel_code",{})), + "panel_region":str(e.get("panel",{}).get("panel_region",{})), + "is_macro_region":str(e.get("panel",{}).get("is_macro_region",{})), + "break_type": str(item.get("break_type")), + "broadcaster_spot_number": str(item.get("broadcaster_spot_number")), + "campaign_approval_id": str(item.get("campaign_approval_id")), + "match_group_code": str(item["clearcast_information"].get( + "match_group_code")), + "match_group_name": str(item["clearcast_information"].get( + "match_group_name")), + "buyer_code": str(item["clearcast_information"].get("buyer_code")), + "buyer_name": str(item["clearcast_information"].get("buyer_name")), + "advertiser_code": str(item["clearcast_information"].get( + "advertiser_code")), + "advertiser_name": str(item["clearcast_information"].get( + "advertiser_name")), + "holding_company_code": str(item["clearcast_information"].get( + "holding_company_code")), + "holding_company_name": str(item["clearcast_information"].get( + "holding_company_name")), + "product_code": str(item["clearcast_information"].get( + "product_code")), + "product_name": str(item["clearcast_information"].get( + "product_name")), + "clearcast_commercial_title": str(item["clearcast_information"].get( + "clearcast_commercial_title")), + "commercial_spot_length": str(item["clearcast_information"].get( + "commercial_spot_length")), + "clearcast_web_address": str(item["clearcast_information"].get( + "clearcast_web_address")), + "commercial_number": str(item.get("commercial_number")), + "platforms": json.dumps(platforms), + "position_in_break": str(item.get("position_in_break")), + "preceeding_programme_name": str(item.get("preceeding_programme_name")), + # "sales_house":{ + "sales_house_name": str(item["sales_house"].get( + "sales_house_name")), + "sales_house_brand_description": str(item["sales_house"].get( + "sales_house_brand_description")), + # }, + "spot_duration": str(item.get("spot_duration")), + # "spot_start_datetime": { + "barb_reporting_datetime": item["spot_start_datetime"].get( + "barb_reporting_datetime"), + "barb_polling_datetime": item["spot_start_datetime"].get( + "barb_polling_datetime"), + "standard_datetime": item["spot_start_datetime"].get( + "standard_datetime"), + # }, + "spot_type": str(item.get("spot_type")), + "succeeding_programme_name": str(item.get("succeeding_programme_name")) + } + ) + # Convert the result into a data frame + columns_dict={ + "scheduled_date":"datetime64[ns]", + # "station", + # "panel", + # "spot_schedule" + "station_code":"string", + "station_name":"string", + "panel_code":"string", + "panel_region":"string", + "is_macro_region":"string", + "break_type":"string", + "broadcaster_spot_number":"string", + "campaign_approval_id":"string", + "match_group_code":"string", + "match_group_name":"string", + "buyer_code":"string", + "buyer_name":"string", + "advertiser_code":"string", + "advertiser_name":"string", + "holding_company_code":"string", + "holding_company_name":"string", + "product_code":"string", + "product_name":"string", + "clearcast_commercial_title":"string", + "commercial_spot_length":"string", + "clearcast_web_address":"string", + "commercial_number":"string", + "platforms":"string", + "position_in_break":"string", + "preceeding_programme_name":"string", + # "sales_house":{ + "sales_house_name":"string", + "sales_house_brand_description":"string", + # }, + "spot_duration":"string", + # "spot_start_datetime": { + "barb_reporting_datetime":"string", + "barb_polling_datetime":"string", + "standard_datetime":"datetime64[ns]", + # }, + "spot_type":"string", + "succeeding_programme_name":"string" + } + #verify dtypes + if len(spot_data)>0: + verifry_class = VerifyDtypes() + verifry_class.verify_dtypes( + data=spot_data + ,column_dtypes=columns_dict + ) + spot_data_frame = pd.DataFrame(data=spot_data, columns=list(columns_dict.keys())) + print("data frame:",spot_data_frame) + + if not spot_data_frame.empty: + # Format the transmission_time_period as a pandas datetime + spot_data_frame["scheduled_date"] = pd.to_datetime( + spot_data_frame["scheduled_date"] + ) + #set dtypes + spot_data_frame=spot_data_frame.astype(dtype=columns_dict) + return spot_data_frame + except pd.errors.DataError as error: + print(error) + raise pd.errors.DataError( + error + ) from error + +class VerifyDtypes: + """schema verification""" + + def verify_dtypes(self,data:list[dict],column_dtypes:dict) -> list[dict]: + """process of verifiying and changing data""" + for item,value in enumerate(data): + value:dict + for column, dtype in column_dtypes.items(): + column:str + dtype:str + series = pd.Series(data={"column":value.get(column)},index=["column"]) + if dtype.lower()=="datetime64[ns]": + pd.to_datetime(series) + elif (dtype.lower()=="string" and not( + isinstance(type(value.get(column)),str))): + data[item][column] = None + elif dtype.lower()=="int64" and not( + isinstance(type(value.get(column)),int)): + data[item][column] = 0 + elif dtype.lower()=="bool" and not( + isinstance(type(value.get(column)),bool)): + data[item][column] = False + elif dtype.lower()=="object" and not( + isinstance(type(value.get(column)),list)): + data[item][column]=[] + else: + raise TypeError(f"unknown type {dtype} for '{column}'") + return data diff --git a/Example/helpers/secret_manager.py b/Example/helpers/secret_manager.py new file mode 100644 index 0000000..1d20eb6 --- /dev/null +++ b/Example/helpers/secret_manager.py @@ -0,0 +1,62 @@ +"""modules to import""" +import os +from google.cloud import secretmanager + +class GcpSecretManager: + """Used to create secret and read secrets in the current project + @param secret_id + @param payload + """ + + def __init__(self, secret_id = None, payload = None) -> None: + self.secret_id = secret_id + self.project_id = os.getenv('PROJECT_ID') + self.gcp_client = secretmanager.SecretManagerServiceClient() + + + def create_secret(self,payload): + """create secret""" + parent = f'projects/{self.project_id}' + + secret = self.gcp_client.create_secret( + request={ + "parent": parent, + "secret_id": self.secret_id, + "secret": {"replication": {"automatic": {}}}, + } + ) + + self.gcp_client.add_secret_version( + request={"parent": secret.name, "payload": {"data" : payload}} + ) + + def get_secret(self, secret_name): + """ + Args: + secert_name (string): string + + Returns: + string: most recent version of the secret + """ + + secret_name = f"projects/{self.project_id}/secrets/{secret_name}/versions/latest" + + response = self.gcp_client.access_secret_version(name=secret_name) + return response.payload.data.decode("UTF-8") + + + def update_secret(self, secret_name, payload): + """update secret""" + parent = self.gcp_client.secret_path(self.project_id, secret_name) + + payload = payload.encode("UTF-8") + + response = self.gcp_client.add_secret_version(request={ + "parent" : parent + , "payload" : { + "data": payload + } + } + ) + + print(f"Added secret version: {response.name}") \ No newline at end of file diff --git a/Example/main.py b/Example/main.py new file mode 100644 index 0000000..6e0739c --- /dev/null +++ b/Example/main.py @@ -0,0 +1,710 @@ +"""module used for environment variables""" +import os +import json +import datetime as DT +import flask +import functions_framework +import pandas as pd +import yaml +from google.cloud import tasks_v2 +from google.api_core.exceptions import BadRequest +from helpers.big_query_class import BigQueryClass +from helpers.pybarb import BarbAPI + +DATASET = "Barb_Data" + +def default(request:flask.Request= None): + """defaults for all the functions functions""" + end_date = DT.date.today() + end_date -= DT.timedelta(days=10) + start_date = end_date - DT.timedelta(days=17) + + start_date = start_date.strftime('%Y-%m-%d') + end_date = end_date.strftime('%Y-%m-%d') + query_list=[ + "advertising_spots", + #"spot_audience", + "spot_schedule", + "programme_ratings", + "programme_audience", + "programme_schedule", + "viewing", + "audience_by_time", + #"panel_members", + #"households" + ] + if request: + if request.is_json: + request_json = request.get_json(silent=True) + request_json:dict + start_date = request_json.get("start_date",start_date) + end_date = request_json.get("end_date",end_date) + query_list = request_json.get("query_list",query_list) + print(start_date,end_date,query_list) + return end_date, start_date, query_list + +def send_to_tasks( + message_json:str + , task_name:str + , queueurl:str + , queue_name:str = "Barb-queue" + ,prefix="barb"): + """send the next section to tasks""" + task_client = tasks_v2.CloudTasksClient() + queue = task_client.queue_path(os.getenv('PROJECT_ID'), "europe-west2", queue_name) + task ={ + "http_request":{ + "http_method": tasks_v2.HttpMethod.POST + ,"headers":{ + "content-type": 'application/json' + } + ,"oidc_token":{ + "service_account_email" : os.getenv('SERVICE_ACCOUNT') + } + } + } + message_bytes = message_json.encode("utf-8") + task["http_request"]["body"]= message_bytes + #publish message + task_name=f"{str(DT.datetime.now())}_{task_name}" + name = task_client.task_path(os.getenv("PROJECT_ID"), "europe-west2", queue_name, task_name) + name = name.replace(":","-") + name = name.replace(".","_") + name = name.replace(" ","__") + task["name"]= name + task_url = f"https://europe-west2-{os.getenv('PROJECT_ID')}" + task_url+= f".cloudfunctions.net/{prefix}-{os.getenv('STAGE')}-{queueurl}" + task["http_request"]["url"] = task_url + task["http_request"]["oidc_token"]["audience"]= task["http_request"]["url"] + print(task) + request = tasks_v2.CreateTaskRequest( + parent = queue + ,task = task + ) + if os.getenv("ENV"): + return request + else: + response = task_client.create_task( + request = request + ,timeout = 20 + ) + return response + +def daterange(date1:str, date2:str, n:int=6) -> list[dict]: + """give a list of start date and end dates in interval 6""" + format_date = "%Y-%m-%d" + start_date = DT.datetime.strptime(date1, format_date) + end_date = DT.datetime.strptime(date2, format_date) + date_array = [] + sudo_start = start_date + while sudo_start <= end_date: + if (sudo_start + DT.timedelta(days=n))>end_date: + sudo_end = end_date + else: + sudo_end = sudo_start + DT.timedelta(days=n) + date_array.append({ + "start_date":sudo_start.strftime('%Y-%m-%d') + ,"end_date": sudo_end.strftime('%Y-%m-%d') + }) + sudo_start+= DT.timedelta(n+1) + return date_array + +@functions_framework.http +def barb_default(request:flask.Request= None): + """main code starts here""" + end_date, start_date, query_list= default(request) + date_range = daterange(start_date,end_date,2) + print(date_range) + + barb_api = BarbAPI() + #list stations + stations = barb_api.list_stations() + print(f"total stations:{len(stations)}") + panels = barb_api.list_panels() + print(f"total panels:{len(panels)}") + #print(panels) + #loop to send to tasks + for dates in date_range: + for item in query_list: + if item in [ + "advertising_spots", + "spot_schedule", + "programme_ratings", + "programme_schedule", + "audience_by_time", + ]: + for station in stations: + message_dict = { + 'item': item + ,"station": station + ,"start_date": dates["start_date"] + ,"end_date": dates["end_date"] + } + message_json = json.dumps(message_dict) + print(message_json) + task_name = f"{item}-request-station_code-{station['station_code']}" + task_name+= f"-{dates['start_date']}--{dates['end_date']}" + if os.getenv("ENV"): + if item=="advertising_spots": + barb_2_request_advertising_spots(message_dict) + elif item=="spot_schedule": + barb_2_request_spot_schedule(message_dict) + elif item=="programme_ratings": + barb_2_request_programme_ratings(message_dict) + elif item=="programme_schedule": + barb_2_request_programme_schedule(message_dict) + elif item=="audience_by_time": + barb_2_request_audience_by_time(message_dict) + else: + send_to_tasks( + message_json = message_json + , queueurl= f"2_request_{item}" + , task_name = task_name + ) + elif item in [ + "spot_audience", + "programme_audience", + "viewing", + "panel_members", + "households" + ]: + for panel in panels: + message_dict = { + 'item': item + ,"panel": panel + ,"start_date": dates["start_date"] + ,"end_date": dates["end_date"] + } + message_json = json.dumps(message_dict) + print(message_json) + task_name = f"{item}-request-panel_code-{panel['panel_code']}" + task_name+= f"-{dates['start_date']}--{dates['end_date']}" + if os.getenv("ENV"): + if item=="spot_audience": + barb_2_request_spot_audience(message_dict) + elif item=="programme_audience": + barb_2_request_programme_audience(message_dict) + elif item=="programme_schedule": + barb_2_request_programme_schedule(message_dict) + elif item=="viewing": + barb_2_request_viewing(message_dict) + #elif item=="panel_members": + # barb_2_request_advertising_spots(message_dict) + #elif item=="households": + # barb_2_request_advertising_spots(message_dict) + else: + send_to_tasks( + message_json = message_json + , queueurl= f"2_request_{item}" + , task_name = task_name + ) + return "sent to tasks" + +@functions_framework.http +def barb_2_request_advertising_spots(request:flask.Request=None): + """ + A request for advertising spots by station + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + print(message_json) + message_json:dict + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + + barb_api = BarbAPI() + data_frame = barb_api.advertising_spots( + min_transmission_date=start_date + ,max_transmission_date=end_date + ,station_code=station["station_code"] + ,buyer="phd_media_limited" + ).to_dataframe() + + if data_frame.empty: + return "No Data" + #test_dataframe(data_frame=data_frame) + #return "done" + message_json = bigquery_load_table( + message_json=message_json + , data_frame=data_frame + ) + message_json["columns"]=["date_of_transmission","station_name"] + task_name = f"{item}-delete-{start_date}--{end_date}" + if os.getenv("ENV"): + barb_3_delete_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "3_delete_data" + , task_name = task_name + ) + return "ok" + +@functions_framework.http +def barb_2_request_spot_audience(request:flask.Request=None): + """ + Get a full viewer level details of a specified spot + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + #item = message_json.get('item') + panel = message_json.get('panel') + + barb_api = BarbAPI() + data_frame = barb_api.spot_audience( + min_date=start_date + ,max_date=end_date + ,panel_code=panel["panel_code"] + ) + + if not data_frame: + print("no data finish") + return "No data" + print("there is data so download file") + data_frame.download_parquet() + + # if data_frame.empty: + # return "No Data" + # test_dataframe(data_frame=data_frame) + # #return "done" + # message_json = bigquery_load_table( + # message_json=message_json + # , data_frame=data_frame + # ) + # message_json["columns"]=["","panel"] + # task_name = f"{item}-delete-{start_date}--{end_date}" + # if os.getenv("ENV"): + # barb_3_delete_data(message_json) + # else: + # send_to_tasks( + # message_json = json.dumps(message_json) + # , queueurl= "3_delete_data" + # , task_name = task_name + # ) + return "ok" + +@functions_framework.http +def barb_2_request_spot_schedule(request:flask.Request=None): + """ + Get a full viewer level details of a specified spot + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + + barb_api = BarbAPI() + data_frame = barb_api.spot_schedule( + min_date=start_date + ,max_date=end_date + ,station_code=station["station_code"] + ).to_dataframe() + if data_frame.empty: + return "No Data" + #test_dataframe(data_frame=data_frame) + #return "done" + message_json = bigquery_load_table( + message_json=message_json + , data_frame=data_frame + ) + message_json["columns"]=["scheduled_date","station_code"] + task_name = f"{item}-delete-{start_date}--{end_date}" + if os.getenv("ENV"): + barb_3_delete_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "3_delete_data" + , task_name = task_name + ) + return "ok" + +@functions_framework.http +def barb_2_request_programme_ratings(request:flask.Request=None): + """ + Get a full viewer level details of a specified spot + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + + barb_api = BarbAPI() + data_frame = barb_api.programme_ratings( + min_date=start_date + ,max_date=end_date + ,station_code=station["station_code"] + ).to_dataframe() + if data_frame.empty: + return "No Data" + #test_dataframe(data_frame=data_frame) + #return "done" + message_json = bigquery_load_table( + message_json=message_json + , data_frame=data_frame + ) + message_json["columns"]=["date_of_transmission","station_code"] + task_name = f"{item}-delete-{start_date}--{end_date}" + if os.getenv("ENV"): + barb_3_delete_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "3_delete_data" + , task_name = task_name + ) + return "ok" + +@functions_framework.http +def barb_2_request_programme_audience(request:flask.Request=None): + """ + programme_audience + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + #item = message_json.get('item') + panel = message_json.get('panel') + + barb_api = BarbAPI() + data_frame = barb_api.programme_audience( + min_date=start_date + ,max_date=end_date + ,panel_code=panel["panel_code"] + ) + print(data_frame) + if not data_frame: + print("no data finish") + return "No data" + print("there is data so download file") + data_frame.download_parquet() + return "ok" + +@functions_framework.http +def barb_2_request_programme_schedule(request:flask.Request=None): + """ + programme_schedule + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + + barb_api = BarbAPI() + data_frame = barb_api.programme_schedule( + min_date=start_date + ,max_date=end_date + ,station_code=station["station_code"] + ).to_dataframe() + print(data_frame) + if data_frame.empty: + print("no data finish") + return "No data" + #test_dataframe(data_frame=data_frame) + #return "done" + message_json = bigquery_load_table( + message_json=message_json + , data_frame=data_frame + ) + message_json["columns"]=["scheduled_date","station_code"] + task_name = f"{item}-delete-{start_date}--{end_date}" + if os.getenv("ENV"): + barb_3_delete_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "3_delete_data" + , task_name = task_name + ) + return "ok" + +@functions_framework.http +def barb_2_request_viewing(request:flask.Request=None): + """ + Get the viewing report data by day and panel. + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + #item = message_json.get('item') + panel = message_json.get('panel') + + barb_api = BarbAPI() + data_frame = barb_api.viewing( + min_date=start_date + ,max_date=end_date + ,panel_code=panel["panel_code"] + ) + + if not data_frame: + print("no data finish") + return "No data" + print("there is data so download file") + data_frame.download_parquet() + + # if data_frame.empty: + # return "No Data" + # test_dataframe(data_frame=data_frame) + # #return "done" + # message_json = bigquery_load_table( + # message_json=message_json + # , data_frame=data_frame + # ) + # message_json["columns"]=["","panel"] + # task_name = f"{item}-delete-{start_date}--{end_date}" + # if os.getenv("ENV"): + # barb_3_delete_data(message_json) + # else: + # send_to_tasks( + # message_json = json.dumps(message_json) + # , queueurl= "3_delete_data" + # , task_name = task_name + # ) + return "ok" + +@functions_framework.http +def barb_2_request_audience_by_time(request:flask.Request=None): + """ + Get the audience sizes for various_time periods_by day, station and panel + """ + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + print(message_json) + message_json:dict + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + + barb_api = BarbAPI() + # time_period_options=[1,5,15] + # viewing_status_options=["live", "vosdal", "consolidated"] + # for time in time_period_options: + # for status in viewing_status_options: + data_frame = barb_api.audiences_by_time( + min_transmission_date=start_date + ,max_transmission_date=end_date + ,time_period_length=15 + ,viewing_status="consolidated" + ,station_code=station["station_code"] + ).to_dataframe() + + if data_frame.empty: + return "No Data" + #test_dataframe(data_frame=data_frame) + #return "done" + message_json = bigquery_load_table( + message_json=message_json + , data_frame=data_frame + ) + message_json["columns"]=["date_of_transmission","station_code"] + task_name = f"{item}-delete-{start_date}--{end_date}" + if os.getenv("ENV"): + barb_3_delete_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "3_delete_data" + , task_name = task_name + ) + return "ok" + +def test_dataframe(data_frame:pd.DataFrame): + """for testing purposes""" + print(data_frame.dtypes) + for column in data_frame.columns: + print(data_frame[column]) + return "test done" + +def bigquery_load_table(message_json:dict, data_frame:pd.DataFrame) -> None: + """bigquery load data_frame into table""" + start_date = message_json.get('start_date') + end_date = message_json.get('end_date') + item = message_json.get('item') + station = message_json.get('station') + panel = message_json.get('panel') + job_config = message_json.get('job_config') + #bigquery starts now + bigquery_class = BigQueryClass(dataset=DATASET) + if station: + table_name = f"{item}-{station['station_code']}" + elif panel: + table_name = f"{item}-{panel['panel_code']}" + temp_table = bigquery_class.temp_table( + table_name=f"{table_name}-{start_date}--{end_date}" + ) + raw_data_table = bigquery_class.raw_table( + table_name=item + ) + result = bigquery_class.create_table( + raw_data= raw_data_table + , temp_data= temp_table + ) + try: + result = bigquery_class.load_dataframe_into_table( + dataframe= data_frame + , temp_data= temp_table + , job_config= job_config + ) + except Exception as error: + test_dataframe(data_frame) + raise BadRequest(error) from error + message_json["temp_table"]= temp_table + message_json["raw_table"]= raw_data_table + return message_json + +@functions_framework.http +def barb_3_delete_data(request:flask.Request=None): + """get keyword data between start date and end date""" + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + message_json:dict + print(message_json) + big_query_class = BigQueryClass(dataset=DATASET) + + result = big_query_class.delete_from_table( + temp_data = message_json.get("temp_table") + ,raw_data = message_json.get("raw_table") + ,columns=message_json.get("columns",[]) + ) + task_name = f"INSERT-FROM_{message_json.get('temp_table')}" + task_name+= f"--{DT.datetime.now()}" + if os.getenv("ENV"): + barb_4_insert_data(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "4_insert_data" + , task_name = task_name + , queue_name= "Barb-bq-queue" + ) + return "ok" + +@functions_framework.http +def barb_4_insert_data(request:flask.Request=None): + """get keyword data between start date and end date""" + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + print(message_json) + message_json:dict + big_query_class = BigQueryClass(dataset=DATASET) + columns = big_query_class.get_columns_from_table( + table = message_json.get("raw_table") + ) + result = big_query_class.insert_from_table( + temp_data= message_json.get("temp_table") + ,raw_data= message_json.get("raw_table") + ,columns=columns + ) + task_name = f"DROP-TABLE_{message_json.get('temp_table')}" + task_name+= f"--{DT.datetime.now()}" + if os.getenv("ENV"): + barb_5_drop_table(message_json) + else: + send_to_tasks( + message_json = json.dumps(message_json) + , queueurl= "5_drop_table" + , task_name = task_name + , queue_name= "Barb-bq-queue" + ) + return "ok" + +@functions_framework.http +def barb_5_drop_table(request:flask.Request=None): + """get keyword data between start date and end date""" + if os.getenv("ENV"): + message_json=request + else: + message_json = request.get_json(silent=True) + print(message_json) + message_json:dict + big_query_class = BigQueryClass(dataset=DATASET) + result = big_query_class.drop_temp_table( + temp_table= message_json.get("temp_table") + ) + return "ok" + +if __name__ == '__main__': + with open("dev.yaml",'r',encoding="UTF-8") as file: + variables = yaml.safe_load(file) + os.environ["PROJECT_ID"] = variables["PROJECT_ID"] + os.environ["VERSION"] = variables["VERSION"] + os.environ["STAGE"] = variables["STAGE"] + os.environ["BUCKET"] = variables["BUCKET"] + os.environ["ENV"] = 'local' + os.environ["EMAIL"] = variables["EMAIL"] + os.environ["PASSWORD"] = variables["PASSWORD"] + class TestRequest: + """test request for local testing""" + def __init__(self): + self.is_json=True + + def get_json(self,force=False, silent=False, cache=True) ->dict: + """json object to test""" + json_object={ + "start_date":"2023-12-12" + ,"end_date": "2023-12-15" + ,"query_list":[ + # "advertising_spots", #done + # "spot_audience", #done + # "spot_schedule", #done + # "programme_ratings", #done + # "programme_audience", #done + "programme_schedule", #done + # "viewing", #done + # "audience_by_time", #done + # "panel_members", # + # "households" # + ] + } + return json_object + + def useless_f(self): + """to pad class""" + test_request=TestRequest() + barb_default(test_request) + #functions-framework --target=adgroup_request_function --host=localhost --port=8080 diff --git a/pybarb/pybarb.py b/pybarb/pybarb.py index a459263..63f664b 100644 --- a/pybarb/pybarb.py +++ b/pybarb/pybarb.py @@ -1,12 +1,15 @@ -import requests -import json -import pandas as pd -import numpy as np -import sqlalchemy -import plotly.graph_objs as go +"""modules to import""" import re +import os import time +import tempfile +import json +import requests +import pandas as pd +from helpers.secret_manager import GcpSecretManager +from google.cloud import storage +storage_client= storage.Client(project=os.getenv("PROJECT")) class BarbAPI: """ @@ -26,6 +29,7 @@ class BarbAPI: get_panel_code: Gets the panel code for a given panel region. programme_ratings: Gets the programme ratings for a given date range. advertising_spots: Gets the advertising spots for a given date range. + spot_audience: gets the spot audience report data by day and panel audiences_by_time: Gets the audiences by time for a given date range. list_stations: Lists the stations available in the API. list_viewing_stations: Lists the stations available in the API. @@ -37,19 +41,19 @@ class BarbAPI: ping_job_status: Pings the job status. """ - def __init__(self, api_key: str, api_root="https://barb-api.co.uk/api/v1/"): + def __init__(self, api_root:str="https://barb-api.co.uk/api/v1/"): """ Initializes a new instance of the BarbAPI class. Args: - api_key (str): The API key for accessing the Barb API. + api_key (dict): contains "email" and "password". api_root (str): The root URL of the Barb API. """ - self.api_key = api_key self.api_root = api_root self.connected = False - self.headers = None + self.headers = {} + self.headers["Authorization"]=self.connect() self.current_job_id = None def connect(self): @@ -60,15 +64,28 @@ def connect(self): try: # Code to connect to the Barb API self.connected = True - - # Code to connect to the Barb API - self.connected = True + #get secrets + if os.getenv("ENV"): + email = os.getenv("EMAIL") + password = os.getenv("PASSWORD") + else: + secret_manager = GcpSecretManager() + email = secret_manager.get_secret("barb_email") + password = secret_manager.get_secret("barb_password") # Code to get an access token from the Barb API token_request_url = self.api_root + "auth/token/" - response = requests.post(token_request_url, data=self.api_key) + response = requests.post( + token_request_url + , data={ + "email":email, + "password":password + } + ,timeout = 300 + ) + print(response.text) access_token = json.loads(response.text)["access"] - self.headers = {"Authorization": "Bearer {}".format(access_token)} + return f"Bearer {access_token}" except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") @@ -80,32 +97,7 @@ def connect(self): print("Failed to get access token from the response.") self.connected = False - def get_station_code(self, station_name): - """ - Gets the station code for a given station name. - - Args: - station_name (str): The name of the station to query. - - Returns: - str: The station code. - """ - - api_url = f"{self.api_root}stations/" - r = requests.get(url=api_url, headers=self.headers) - api_data = r.json() - station_code = [ - s["station_code"] - for s in api_data - if station_name.lower() == s["station_name"].lower() - ] - if len(station_code) == 1: - station_code = station_code[0] - else: - raise Exception(f"Station name {station_name} not found.") - return station_code - - def get_viewing_station_code(self, viewing_station_name): + def get_viewing_station_code(self, viewing_station_name:str) ->dict: """ Gets the viewing_station code for a given viewing_station name. @@ -117,7 +109,12 @@ def get_viewing_station_code(self, viewing_station_name): """ api_url = f"{self.api_root}viewing_stations/" - r = requests.get(url=api_url, headers=self.headers) + r = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() api_data = r.json() viewing_station_code = [ s["viewing_station_code"] @@ -127,40 +124,15 @@ def get_viewing_station_code(self, viewing_station_name): if len(viewing_station_code) == 1: viewing_station_code = viewing_station_code[0] else: - raise Exception(f"Viewing station name {viewing_station_name} not found.") + raise KeyError(f"Viewing station name {viewing_station_name} not found.") return viewing_station_code - def get_panel_code(self, panel_region): - """ - Gets the panel code for a given panel region. - - Args: - panel_region (str): The name of the panel to query. - - Returns: - str: The panel code. - """ - - api_url = f"{self.api_root}panels/" - r = requests.get(url=api_url, headers=self.headers) - api_data = r.json() - panel_code = [ - s["panel_code"] - for s in api_data - if panel_region.lower() == s["panel_region"].lower() - ] - if len(panel_code) == 1: - panel_code = panel_code[0] - else: - raise Exception(f"Panel name {panel_region} not found.") - return panel_code - def programme_ratings( self, - min_transmission_date, - max_transmission_date, - station=None, - panel=None, + min_date, + max_date, + station_code:str, + panel_code=None, consolidated=True, last_updated_greater_than=None, use_reporting_days=True, @@ -185,10 +157,10 @@ def programme_ratings( # The query parameters params = { - "min_transmission_date": min_transmission_date, - "max_transmission_date": max_transmission_date, - "station_code": None if station is None else self.get_station_code(station), - "panel_code": None if panel is None else self.get_panel_code(panel), + "min_transmission_date": min_date, + "max_transmission_date": max_date, + "station_code": station_code, + "panel_code": panel_code, "consolidated": consolidated, "last_updated_greater_than": last_updated_greater_than, "use_reporting_days": use_reporting_days, @@ -199,12 +171,82 @@ def programme_ratings( return ProgrammeRatingsResultSet(api_response_data) + def programme_schedule( + self, + min_date:str, + max_date:str, + station_code:str, + last_updated_greater_than:str=None, + ): + """ + Gets the programme ratings for a given date range. + + Args: + min_scheduled_date (str): The minimum scheduled date to query. + max_scheduled_date (str): The maximum scheduled date to query. + station (str): The name of the station to query. + last_updated_greater_than (str): The last updated date to query. + + Returns: + ProgrammeScheduleResultSet: The programme ratings result set. + """ + + # The query parameters + params = { + "min_scheduled_date": min_date, + "max_scheduled_date": max_date, + "station_code": station_code, + "last_updated_greater_than": last_updated_greater_than, + } + + api_response_data = self.query_bulk_endpoint("programme_schedule", params) + + return ProgrammeScheduleResultSet(api_response_data) + + def programme_audience( + self, + min_date, + max_date, + panel_code:int, + ): + """ + Gets the programme audience for a given date range. + + Args: + min_sesssion_date (str): The minimum transmission date to query. + max_sesssion_date (str): The maximum transmission date to query. + panel_code (str): The panel code to query. + + Returns: + ProgrammeAudienceResultSet: The programme audience result set. + """ + + # The query parameters + params = { + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code": panel_code, + } + + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/programme_audience", + parameters=params, + method="GET" + ) + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + print(string) + # raise Warning(string) + return None + + return GoogleBucketResultSet(api_response_data, endpoint="programme_audience") + def advertising_spots( self, - min_transmission_date, - max_transmission_date, - station=None, - panel=None, + min_transmission_date:str, + max_transmission_date:str, + station_code=None, + panel_code=None, advertiser=None, buyer=None, consolidated=True, @@ -219,8 +261,8 @@ def advertising_spots( Args: min_transmission_date (str): The minimum transmission date to query. max_transmission_date (str): The maximum transmission date to query. - station (str): The name of the station to query. - panel (str): The name of the panel to query. + station_code (str): The code of the station to query. + panel_code (str): The code of the panel to query. advertiser (str): The name of the advertiser to query. buyer (str): The name of the buyer to query. consolidated (bool): Whether to return consolidated data. @@ -237,8 +279,8 @@ def advertising_spots( params = { "min_transmission_date": min_transmission_date, "max_transmission_date": max_transmission_date, - "station_code": None if station is None else self.get_station_code(station), - "panel_code": None if panel is None else self.get_panel_code(panel), + "station_code": station_code, + "panel_code":panel_code, "advertiser_name": advertiser, "buyer_name": buyer, "consolidated": consolidated, @@ -247,19 +289,73 @@ def advertising_spots( "last_updated_greater_than": last_updated_greater_than, "limit": limit, } - api_response_data = self.query_event_endpoint("advertising_spots", params) return AdvertisingSpotsResultSet(api_response_data) + def spot_audience(self, + min_date:str, + max_date:str, + panel_code): + """ + Gets the advertising spots for a given date range. + + Args: + min_transmission_date (str): The minimum transmission date to query. + max_transmission_date (str): The maximum transmission date to query. + panel_code (str): The code of the panel to query. + + Returns: + SpotAudienceResultSet: The advertising spots result set. + """ + + # The query parameters + params = { + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code":panel_code, + } + + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/spot_audience", + parameters=params, + method="GET" + ) + print("api_response_data", api_response_data) + + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + #raise Warning(string) + return None + return GoogleBucketResultSet(api_response_data,endpoint="spot_audience") + + def spot_schedule(self + ,min_date:str + ,max_date:str + ,station_code:int + ,last_updated_gt:str=None): + """ + request the spot schedule endpoint + """ + # The query parameters + params = { + "min_scheduled_date": min_date, + "max_scheduled_date": max_date, + "station_code": station_code, + } + if last_updated_gt: + params["last_updated_greater_than"]= last_updated_gt + api_response_data = self.query_bulk_endpoint("spot_schedule", params) + return SpotScheduleResultSet(api_response_data) + def audiences_by_time( self, min_transmission_date, max_transmission_date, time_period_length, viewing_status, - station=None, - panel=None, + station_code=None, + panel_code=None, use_polling_days=True, last_updated_greater_than=None, limit=5000, @@ -287,8 +383,8 @@ def audiences_by_time( params = { "min_transmission_date": min_transmission_date, "max_transmission_date": max_transmission_date, - "station_code": None if station is None else self.get_station_code(station), - "panel_code": None if panel is None else self.get_panel_code(panel), + "station_code": station_code, + "panel_code": panel_code, "time_period_length": time_period_length, "viewing_status": viewing_status, "use_polling_days": use_polling_days, @@ -302,21 +398,21 @@ def audiences_by_time( def viewing( self, - min_session_date, - max_session_date, - viewing_station=None, - panel=None, - activity_type=None, - last_updated_greater_than=None, - output_format="parquet", - limit=5000, + min_date:str, + max_date:str, + panel_code:str, + #viewing_station=None, + #activity_type=None, + #last_updated_greater_than=None, + #output_format="parquet", + #limit=5000, ): """ Gets the viewing for a given date range. Args: - min_session_date (str): The minimum session date to query. - max_session_date (str): The maximum session date to query. + min_date (str): The minimum session date to query. + max_date (str): The maximum session date to query. viewing_station (str): The name of the viewing_station to query. panel (str): The name of the panel to query. activity_type (str): The activity type to query. @@ -330,31 +426,37 @@ def viewing( # The query parameters params = { - "min_session_date": min_session_date, - "max_session_date": max_session_date, - "viewing_station_code": None - if viewing_station is None - else self.get_viewing_station_code(viewing_station), - "panel_code": None if panel is None else self.get_panel_code(panel), - "output_format": output_format, - "limit": limit, + "min_session_date": min_date, + "max_session_date": max_date, + "panel_code": panel_code, + # "viewing_station_code": None + # if viewing_station is None + # else self.get_viewing_station_code(viewing_station), + # "output_format": output_format, + # "limit": limit, } - if activity_type is not None: - params["activity_type"] = activity_type - - if last_updated_greater_than is not None: - params["last_updated_greater_than"] = last_updated_greater_than + # if activity_type is not None: + # params["activity_type"] = activity_type - api_response_data = self.query_asynch_endpoint( - "async-batch/viewing/", parameters=params - ) + # if last_updated_greater_than is not None: + # params["last_updated_greater_than"] = last_updated_greater_than - print( - f"{api_response_data['message']} The job id is {api_response_data['job_id']}" + api_response_data = self.query_bulk_endpoint( + endpoint="bulk/viewing/", + parameters=params, + method="GET" ) + if api_response_data is None: + string=f"no data recieved for {panel_code} for {min_date} and {max_date}" + if os.getenv("ENV"): + print(string) + else: + raise Warning(string) + return None + return GoogleBucketResultSet(api_response_data,endpoint="viewing") - def query_event_endpoint(self, endpoint, parameters): + def query_event_endpoint(self, endpoint, parameters,method="GET"): """ Queries the event endpoint. Args: @@ -364,40 +466,57 @@ def query_event_endpoint(self, endpoint, parameters): Returns: dict: The API response data. """ - + api_response_data = {"endpoint": endpoint, "events": []} try: api_url = f"{self.api_root}{endpoint}" - r = requests.get(url=api_url, params=parameters, headers=self.headers) + r = requests.request( + url=api_url + , params=parameters + , headers=self.headers + , timeout=300 + , method=method + ) + r.raise_for_status() # If the response is not 200 then raise an exception if r.status_code != 200: - raise Exception(f"Error: {r.status_code} - {r.text}") + raise requests.HTTPError(f"Error: {r.status_code} - {r.text}") r_json = r.json() + #print(r_json) # If events is not in the response then raise an exception if "events" not in r_json.keys(): - raise Exception(f"Error: {r_json['message']}") + raise KeyError(f"Error: {r_json['message']}") # If events is empty then raise an exception if len(r_json["events"]) == 0: - raise Exception(f"Error: No events returned.") + return api_response_data api_response_data = {"endpoint": endpoint, "events": r_json["events"]} - while r.headers.__contains__("X-Next"): + count=0 + while "X-Next" in r.headers: x_next_url = r.headers["X-Next"] - r = requests.get(url=x_next_url, headers=self.headers) + r = requests.get( + url=x_next_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() r_json = r.json() api_response_data["events"] = ( api_response_data["events"] + r_json["events"] ) + print(count) + count+=1 return api_response_data except requests.exceptions.RequestException as e: - print(f"An error occurred: {e}") + raise requests.exceptions.RequestException(f"An error occurred: {e}") except json.JSONDecodeError: print("Failed to decode the response.") + return api_response_data def list_stations(self, regex_filter=None): """ @@ -408,19 +527,28 @@ def list_stations(self, regex_filter=None): """ api_url = f"{self.api_root}stations" + try: + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + api_response_data.raise_for_status() + list_of_stations = [] + for x in api_response_data.json(): + list_of_stations.append(x) - api_response_data = requests.get(url=api_url, headers=self.headers) + if len(list_of_stations) == 0: + raise requests.RequestException("Error: No stations returned.") - list_of_stations = [x["station_name"] for x in api_response_data.json()] + if regex_filter is not None: + regex = re.compile(regex_filter, flags=re.IGNORECASE) + list_of_stations = list(filter(regex.search, list_of_stations)) - if len(list_of_stations) == 0: - raise Exception(f"Error: No stations returned.") - - if regex_filter is not None: - regex = re.compile(regex_filter, flags=re.IGNORECASE) - list_of_stations = list(filter(regex.search, list_of_stations)) - - return list_of_stations + return list_of_stations + except requests.RequestException as req_error: + print(api_response_data.content) + raise requests.RequestException("error") from req_error def list_viewing_stations(self, regex_filter=None): """ @@ -432,12 +560,16 @@ def list_viewing_stations(self, regex_filter=None): api_url = f"{self.api_root}viewing_stations" - api_response_data = requests.get(url=api_url, headers=self.headers) + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) - list_of_stations = [x["viewing_station_name"] for x in api_response_data.json()] + list_of_stations = list(api_response_data.json()) if len(list_of_stations) == 0: - raise Exception(f"Error: No stations returned.") + raise requests.RequestException("Error: No stations returned.") if regex_filter is not None: regex = re.compile(regex_filter, flags=re.IGNORECASE) @@ -445,7 +577,7 @@ def list_viewing_stations(self, regex_filter=None): return list_of_stations - def list_panels(self, regex_filter=None): + def list_panels(self, regex_filter=None) ->[list,None]: """ Lists the panels available in the API. @@ -454,12 +586,16 @@ def list_panels(self, regex_filter=None): """ api_url = f"{self.api_root}panels" - api_response_data = requests.get(url=api_url, headers=self.headers) + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) - list_of_panels = [x["panel_region"] for x in api_response_data.json()] + list_of_panels = list(api_response_data.json()) if len(list_of_panels) == 0: - raise Exception(f"Error: No panels returned.") + raise requests.RequestException("Error: No panels returned.") if regex_filter is not None: regex = re.compile(regex_filter, flags=re.IGNORECASE) @@ -476,12 +612,16 @@ def list_buyers(self, regex_filter=None): """ api_url = f"{self.api_root}buyers" - api_response_data = requests.get(url=api_url, headers=self.headers) + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) list_of_buyers = api_response_data.json() if len(list_of_buyers) == 0: - raise Exception(f"Error: No buyers returned.") + raise requests.RequestException("Error: No buyers returned.") if regex_filter is not None: regex = re.compile(regex_filter, flags=re.IGNORECASE) @@ -498,12 +638,16 @@ def list_advertisers(self, regex_filter=None): """ api_url = f"{self.api_root}advertisers" - api_response_data = requests.get(url=api_url, headers=self.headers) + api_response_data = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) list_of_advertisers = [a["advertiser_name"] for a in api_response_data.json()] if len(list_of_advertisers) == 0: - raise Exception(f"Error: No advertisers returned.") + raise requests.RequestException("Error: No advertisers returned.") if regex_filter is not None: regex = re.compile(regex_filter, flags=re.IGNORECASE) @@ -511,7 +655,7 @@ def list_advertisers(self, regex_filter=None): return list_of_advertisers - def query_asynch_endpoint(self, endpoint, parameters): + def query_asynch_endpoint(self, endpoint, parameters,method="POST"): """ Queries the asynch endpoint. @@ -527,14 +671,23 @@ def query_asynch_endpoint(self, endpoint, parameters): # Query the API and turn the response into json try: - r = requests.post(url=api_url, json=parameters, headers=self.headers) + r = requests.request( + url=api_url + , json=parameters + , headers=self.headers + ,timeout=300 + ,method=method + ) + r.raise_for_status() r_json = r.json() + print(r_json) self.current_job_id = r_json["job_id"] return r_json except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") except json.JSONDecodeError: print("Failed to decode the response.") + return None def get_asynch_file_urls(self, job_id=None): """ @@ -552,18 +705,24 @@ def get_asynch_file_urls(self, job_id=None): try: api_url = f"{self.api_root}async-batch/results/{job_id}" - r = requests.get(url=api_url, headers=self.headers) + r = requests.get( + url=api_url + , headers=self.headers + ,timeout=300 + ) + r.raise_for_status() r_json = r.json() if r_json["status"] == "started": return False urls = [x["data"] for x in r_json["result"]] if len(urls) == 0: - raise Exception(f"Error: No urls returned.") + raise IndexError("Error: No urls returned.") return urls except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") except json.JSONDecodeError: print("Failed to decode the response.") + return None def get_asynch_files(self): """ @@ -579,8 +738,10 @@ def get_asynch_files(self): df = pd.read_parquet(file) results = pd.concat([results, df]) return ViewingResultSet(results) - except: + except pd.errors.DataError as error: print("Failed to get the asynch files.") + print(error) + return None def ping_job_status(self, job_id=None): """ @@ -606,6 +767,55 @@ def ping_job_status(self, job_id=None): f"Job complete. {len(self.current_file_urls)} files are ready for download." ) + def query_bulk_endpoint(self, endpoint:str, parameters:dict,method="GET")->list[dict]: + """ + Queries the asynch endpoint. + + Args: + endpoint (str): The endpoint to query. + parameters (dict): The query parameters. + + Returns: + dict: The API response data. + """ + + api_url = f"{self.api_root}{endpoint}" + data_list=[] + next_token=True + # Query the API and turn the response into json + try: + while next_token: + r = requests.request( + url = api_url + , params = parameters + , headers = self.headers + ,timeout = 300 + ,method = method + ) + r.raise_for_status() + r_json = r.json() + #print("json output:",r_json) + #append data + if isinstance(r_json, list): + for item in r_json: + data_list.append(item) + elif isinstance(r_json,dict): + data_list.append(item) + else: + raise TypeError("Wrong data type for",r_json) + #continue? + if "X-Next" in list(r.headers.keys()): + api_url = r.headers["X-Next"] + else: + next_token=False + return data_list + except requests.exceptions.RequestException as e: + print(r.content) + print(r.url) + raise Warning(f"An error occurred: {e}") from e + except json.JSONDecodeError: + print("Failed to decode the response.") + return None class APIResultSet: """ @@ -622,139 +832,154 @@ def __init__(self, api_response_data: dict): self.api_response_data = api_response_data - def to_dataframe(self): - """ - Converts the API response data into a pandas dataframe. - - Returns: - pandas.DataFrame: A dataframe containing the API response data. - - """ - raise NotImplementedError() - - def to_csv(self, file_name): - """ - Saves the API response data as a CSV file. - - Args: - file_name (str): The name of the CSV file to save. - """ - self.to_dataframe().to_csv(file_name, index=False) - - def to_excel(self, file_name): - """ - Saves the API response data as an Excel file. - - Args: - file_name (str): The name of the Excel file to save. - """ - self.to_dataframe().to_excel(file_name, index=False) - - def to_json(self, file_name): +class BulkResultSet: + """ + Respresents the bulk result set from the Barb API + """ + def __init__(self, api_response_data: list[dict]): """ - Saves the API response data as a JSON file. + Initialises a new instance of the BulkResultSet class. Args: - file_name (str): The name of the JSON file to save. + api_response_data (list[dict]): The API response data. """ - with open(file_name, "w") as f: - json.dump(self.api_response_data, f) - def to_sql(self, connection_string, table_name, if_exists="replace"): - """ - Saves the API response data as a SQL table. + self.api_response_data = api_response_data - Args: - connection_string (str): The connection string to the SQL database. - table_name (str): The name of the SQL table to save. - if_exists (str): The action to take if the SQL table already exists. - """ - df = self.to_dataframe() - engine = sqlalchemy.create_engine(connection_string) - df.to_sql(table_name, engine, if_exists=if_exists, index=False) +class ProgrammeRatingsResultSet(APIResultSet): + """ + Represents a programme ratings result set from the Barb API. + """ - def audience_pivot(self): + def to_dataframe(self) ->pd.DataFrame: """ - Converts the API response data into a pandas dataframe with the audience names as columns. + Converts the API response data into a pandas dataframe. Returns: - pandas.DataFrame: A dataframe containing the API response data with the audience names as columns. - - """ - df = self.to_dataframe() - entity = ( - "programme_name" - if "programme_name" in df.columns - else "clearcast_commercial_title" - if "clearcast_commercial_title" in df.columns - else "activity" - ) - df = pd.pivot_table( - df, - index=["panel_region", "station_name", "date_of_transmission", entity], - columns="audience_name", - values="audience_size_hundreds", - aggfunc="sum", - ).fillna(0) - return df + pandas.DataFrame: A dataframe containing the API response data. - def ts_plot(self, filter=None): """ - Creates a plotly time series plot of the API response data. - Returns: - plotly.graph_objs.Figure: A plotly time series plot. + # if len(self.api_response_data["events"]) == 0: + # raise Warning("Error: No events returned.") - """ - - df = self.audience_pivot().reset_index() - traces = [] - for i, col in enumerate(df.columns[4:]): - # set the visible attribute to False for all traces except the first one - visible = i == 0 - traces.append( - go.Scatter( - x=df["date_of_transmission"], y=df[col], name=col, visible=visible + # Loop through the events and then the audiences within the events + df_data = [] + for e in self.api_response_data["events"]: + try: + e:dict + df_data.append( + { + "panel_code": e.get("panel",{}).get("panel_code"), + "panel_region": e.get("panel",{}).get("panel_region"), + "is_macro_region": e.get("panel",{}).get("is_macro_region"), + "station_code": e.get("station",{}).get("station_code"), + "station_name": e.get("station",{}).get("station_name"), + "prog_name": e.get("transmission_log_programme_name"), + "programme_type": e.get("programme_type"), + "programme_start_datetime": e.get("programme_start_datetime",{}).get( + "standard_datetime" + ), + "programme_duration_minutes": e.get("programme_duration"), + "spans_normal_day": e.get("spans_normal_day"), + "sponsor_code": e.get("sponsor",{}).get("sponsor_code"), + "bumpers_included": e.get("sponsor",{}).get("bumpers_included"), + "broadcaster_transmission_code": e.get("broadcaster_transmission_code"), + "live_status": e.get("live_status"), + "uk_premiere": e.get("uk_premier"), + "broadcaster_premiere": e.get("broadcaster_premier"), + "programme_repeat": e.get("repeat"), + "episode_name": None if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("episode",{}).get( + "episode_name" + ), + "episode_number": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("episode",{}).get( + "episode_number" + ), + "series_number": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "series_number" + ), + "number_of_episodes": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "number_of_episodes" + ), + "broadcaster_series_id": -1 if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("series",{}).get( + "broadcaster_series_id" + ), + "genre": None if not(isinstance(e.get( + "programme_content"),dict)) else e.get( + "programme_content",{}).get("genre"), + "platforms": e.get("platforms",[]), + "audience_views": e.get("audience_views",[{}]), + } ) + except AttributeError as error: + print(e) + raise AttributeError(error) from error + except KeyError as error: + print(e) + raise KeyError(error) from error + # Convert the result into a data frame + columns_dict={ + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "station_code": "string", + "station_name": "string", + "prog_name": "string", + "programme_type": "string", + "programme_start_datetime": "datetime64[ns]", + "programme_duration_minutes": "int64", + "spans_normal_day": "bool", + "sponsor_code": "string", + "bumpers_included": "bool", + "broadcaster_transmission_code": "string", + "live_status": "string", + "uk_premiere": "bool", + "broadcaster_premiere": "bool", + "programme_repeat": "bool", + "episode_name": "string", + "episode_number": "string", + "series_number": "string", + "number_of_episodes": "string", + "broadcaster_series_id": "string", + "genre": "string", + "platforms": "object", + "audience_views": "object", + #"date_of_transmission": "datetime64[ns]" + } + #verify dtypes + if len(df_data)>0: + verifry_class = VerifyDtypes() + df_data = verifry_class.verify_dtypes( + data = df_data + ,column_dtypes = columns_dict ) + df = pd.DataFrame(df_data, columns=list(columns_dict.keys())) - # create the layout for the chart - layout = go.Layout( - title="Audience by time", - xaxis=dict(title="Date"), - yaxis=dict(title="Value"), - showlegend=False, - updatemenus=[ - dict( - buttons=[ - { - "label": col, - "method": "update", - "args": [ - {"visible": [col == trace.name for trace in traces]} - ], - } - for col in df.columns[4:] - ], - direction="down", - # showactive=True - ) - ], - ) - - # create the figure with the traces and layout - fig = go.Figure(data=traces, layout=layout) - - # display the chart - return fig + if not df.empty: + # Format the transmission_time_period as a pandas datetime + df["programme_start_datetime"] = pd.to_datetime(df["programme_start_datetime"]) + df["date_of_transmission"] = df["programme_start_datetime"].dt.date + #dtypes for each column + df=df.astype(dtype=columns_dict) + return df -class ProgrammeRatingsResultSet(APIResultSet): +class ProgrammeScheduleResultSet(BulkResultSet): """ Represents a programme ratings result set from the Barb API. """ - def to_dataframe(self): + def to_dataframe(self) ->pd.DataFrame: """ Converts the API response data into a pandas dataframe. @@ -763,74 +988,125 @@ def to_dataframe(self): """ - if len(self.api_response_data["events"]) == 0: - raise Exception(f"Error: No events returned.") + # if len(self.api_response_data["events"]) == 0: + # raise Warning("Error: No events returned.") # Loop through the events and then the audiences within the events - df = [] + df_data = [] try: - for e in self.api_response_data["events"]: - # Handle the possibility of a null programme_content - prog_name = e["transmission_log_programme_name"].title() - episode_name = None - episode_number = None - genre = None - if e["programme_content"] is not None: - prog_name = e["programme_content"]["content_name"] - if "episode" in e["programme_content"].keys(): - episode_name = e["programme_content"]["episode"]["episode_name"] - episode_number = e["programme_content"]["episode"][ - "episode_number" - ] - if "genre" in e["programme_content"].keys(): - genre = e["programme_content"]["genre"] - - for v in e["audience_views"]: - df.append( - { - "panel_region": e["panel"]["panel_region"], - "station_name": e["station"]["station_name"], - "programme_name": prog_name, - "programme_type": e["programme_type"], - "programme_start_datetime": e["programme_start_datetime"][ - "standard_datetime" - ], - "programme_duration_minutes": e["programme_duration"], - "spans_normal_day": e["spans_normal_day"], - "uk_premiere": e["uk_premier"], - "broadcaster_premiere": e["broadcaster_premier"], - "programme_repeat": e["repeat"], - "episode_number": episode_number, - "episode_name": episode_name, - "genre": genre, - "audience_name": v["description"], - "audience_size_hundreds": v["audience_size_hundreds"], - "audience_target_size_hundreds": v[ - "target_size_in_hundreds" - ], - } - ) - except: - print( - "Failed to convert the API response data into a dataframe. Check the API response data for anomalies" - ) + for item in self.api_response_data: + item:dict + for e in item.get("station_schedule",{}): + e:dict + df_data.append( + { + "scheduled_date": item.get("scheduled_date"), + "station_code": item.get("station",{}).get( + "station_code" + ), + "station_name": item.get("station",{}).get( + "station_name" + ), + "panel_code": item.get("panel",{}).get("panel_code"), + "panel_region": item.get("panel",{}).get("panel_region"), + "is_macro_region": item.get("panel",{}).get( + "is_macro_region" + ), + "broadcaster_premier": e.get("broadcaster_premier"), + "broadcaster_transmission_code": e.get("broadcaster_transmission_code"), + "live_status": e.get("live_status"), + "platforms": e.get("platforms",[]), + "content_name": e.get("programme_content").get("content_name"), + "barb_content_id": e.get("programme_content").get("barb_content_id"), + "broadcaster_content_id": e.get("programme_content").get( + "broadcaster_content_id"), + "metabroadcast_content_id": e.get("programme_content").get( + "metabroadcast_information").get("metabroadcast_content_id"), + "episode_number": e.get("programme_content").get( + "episode").get("episode_number"), + "episode_name": e.get("programme_content").get( + "episode").get("episode_name"), + "series_number": e.get("programme_content").get( + "series").get("series_number"), + "number_of_episodes": e.get("programme_content").get( + "series").get("number_of_episodes"), + "broadcaster_series_id": e.get("programme_content").get( + "series").get("broadcaster_series_id"), + "genre": e.get("programme_content").get("genre"), + "programme_duration": e.get("programme_duration"), + "barb_reporting_datetime": e.get("programme_start_datetime").get( + "barb_reporting_datetime"), + "barb_polling_datetime": e.get("programme_start_datetime").get( + "barb_polling_datetime"), + "standard_datetime": e.get("programme_start_datetime").get( + "standard_datetime"), + "programme_type": e.get("programme_type"), + "repeat": e.get("repeat"), + "spans_normal_day": e.get("spans_normal_day"), + "sponsor_code": e.get("sponsor").get("sponsor_code"), + "bumpers_included": e.get("sponsor").get("bumpers_included"), + "transmission_log_programme_name": e.get("transmission_log_programme_name"), + "uk_premier": e.get("uk_premier") + } + ) + except KeyError as error: + print(error) # Convert the result into a data frame - df = pd.DataFrame(df) + columns_dict={ + "scheduled_date": "datetime64[ns]", + "station_code": "string", + "station_name": "string", + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "broadcaster_premier": "bool", + "broadcaster_transmission_code": "string", + "live_status": "string", + "platforms": "object", + "content_name": "string", + "barb_content_id": "string", + "broadcaster_content_id": "string", + "metabroadcast_content_id": "string", + "episode_number": "int64", + "episode_name": "string", + "series_number": "string", + "number_of_episodes": "int64", + "broadcaster_series_id": "string", + "genre": "string", + "programme_duration": "int64", + "barb_reporting_datetime": "string", + "barb_polling_datetime": "string", + "standard_datetime": "datetime64[ns]", + "programme_type": "string", + "repeat": "bool", + "spans_normal_day": "bool", + "sponsor_code": "string", + "bumpers_included": "bool", + "transmission_log_programme_name": "string", + "uk_premier":"bool" + } + #verify dtypes + if len(df_data)>0: + verifry_class = VerifyDtypes() + verifry_class.verify_dtypes( + data=df_data + ,column_dtypes=columns_dict + ) + df = pd.DataFrame(df_data, columns=list(columns_dict.keys())) - # Format the transmission_time_period as a pandas datetime - df["programme_start_datetime"] = pd.to_datetime(df["programme_start_datetime"]) - df["date_of_transmission"] = df["programme_start_datetime"].dt.date + if not df.empty: + #dtypes for each column + df = df.astype(dtype=columns_dict) return df - class AdvertisingSpotsResultSet(APIResultSet): """ Represents an advertising spots result set from the Barb API. """ - def to_dataframe(self): + def to_dataframe(self) ->pd.DataFrame: """ Converts the API response data into a pandas dataframe. @@ -839,75 +1115,80 @@ def to_dataframe(self): """ - if len(self.api_response_data["events"]) == 0: - raise Exception(f"Error: No events returned.") + #if len(self.api_response_data["events"]) == 0: + # raise Exception("Error: No events returned.") try: # Loop through the events and then the audiences within the events spot_data = [] - for e in self.api_response_data["events"]: - for v in e["audience_views"]: - spot_data.append( - { - "panel_region": e["panel"]["panel_region"], - "station_name": e["station"]["station_name"], - "spot_type": e["spot_type"], - "spot_start_datetime": e["spot_start_datetime"][ - "standard_datetime" - ], - "spot_duration": e["spot_duration"], - "preceding_programme_name": e["preceding_programme_name"], - "succeeding_programme_name": e["succeeding_programme_name"], - "break_type": e["break_type"], - "position_in_break": e["position_in_break"], - "broadcaster_spot_number": e["broadcaster_spot_number"], - "commercial_number": e["commercial_number"], - "clearcast_commercial_title": e["clearcast_information"][ - "clearcast_commercial_title" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_match_group_code": e["clearcast_information"][ - "match_group_code" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_match_group_name": e["clearcast_information"][ - "match_group_name" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_buyer_code": e["clearcast_information"][ - "buyer_code" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_buyer_name": e["clearcast_information"][ - "buyer_name" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_advertiser_code": e["clearcast_information"][ - "advertiser_code" - ] - if e["clearcast_information"] is not None - else None, - "clearcast_advertiser_name": e["clearcast_information"][ - "advertiser_name" - ] - if e["clearcast_information"] is not None - else None, - "campaign_approval_id": e["campaign_approval_id"], - "sales_house_name": e["sales_house"]["sales_house_name"], - "audience_name": v["description"], - "audience_size_hundreds": v["audience_size_hundreds"], - "audience_target_size_hundreds": v[ - "target_size_in_hundreds" - ], - } - ) + for e in self.api_response_data.get("events",[{}]): + dict_event = dict(e) + spot_data.append( + { + "panel_region": e["panel"]["panel_region"], + "panel_code": dict_event["panel"]["panel_code"], + "station_name": dict_event["station"]["station_name"], + "station_code": dict_event["station"]["station_code"], + "spot_type": dict_event["spot_type"], + "spot_start_datetime": dict_event.get("spot_start_datetime",{}).get( + "standard_datetime"), + "spot_duration": dict_event["spot_duration"], + "preceding_programme_name": dict_event["preceding_programme_name"], + "succeeding_programme_name": dict_event["succeeding_programme_name"], + "break_type": dict_event["break_type"], + "position_in_break": dict_event["position_in_break"], + "broadcaster_spot_number": dict_event["broadcaster_spot_number"], + "commercial_number": dict_event["commercial_number"], + "clearcast_commercial_title": dict_event.get( + "clearcast_information",{}).get( + "clearcast_commercial_title",None), + "clearcast_match_group_code": dict_event.get( + "clearcast_information",{}).get( + "match_group_code",None), + "clearcast_match_group_name": dict_event.get( + "clearcast_information",{}).get( + "match_group_name",None), + "clearcast_buyer_code": dict_event.get("clearcast_information",{}).get( + "buyer_code",None), + "clearcast_buyer_name": dict_event.get("clearcast_information",{}).get( + "buyer_name",None), + "clearcast_advertiser_code": dict_event.get("clearcast_information",{}).get( + "advertiser_code",None), + "clearcast_advertiser_name": dict_event.get("clearcast_information",{}).get( + "advertiser_name",None), + "campaign_approval_id": dict_event["campaign_approval_id"], + "sales_house_name": dict_event.get("sales_house",{}).get( + "sales_house_name"), + "audience_views": dict_event.get("audience_views",[{}]), + } + ) # Convert the result into a data frame - spot_data = pd.DataFrame(spot_data) + columns=[ + "panel_region", + "panel_code", + "station_name", + "station_code", + "spot_type", + "spot_start_datetime", + "spot_duration", + "preceding_programme_name", + "succeeding_programme_name", + "break_type", + "position_in_break", + "broadcaster_spot_number", + "commercial_number", + "clearcast_commercial_title", + "clearcast_match_group_code", + "clearcast_match_group_name", + "clearcast_buyer_code", + "clearcast_buyer_name", + "clearcast_advertiser_code", + "clearcast_advertiser_name", + "campaign_approval_id", + "sales_house_name", + "audience_views" + ] + spot_data = pd.DataFrame(data=spot_data, columns=columns) # Format the transmission_time_period as a pandas datetime spot_data["spot_start_datetime"] = pd.to_datetime( @@ -915,19 +1196,51 @@ def to_dataframe(self): ) spot_data["date_of_transmission"] = spot_data["spot_start_datetime"].dt.date + #set dtypes + if not spot_data.empty: + spot_data=self.dataframe_set_dtypes(spot_data) return spot_data - except: - print( - "Failed to convert the API response data into a dataframe. Check the API response data for anomalies" - ) - + except pd.errors.DataError as error: + print(error) + return None + + def dataframe_set_dtypes(self,dataframe:pd.DataFrame): + """sets the dtypes for the columns""" + dtypes_dict={ + "panel_region":"string", + "panel_code":"int64", + "station_name":"string", + "station_code":"int64", + "spot_type":"string", + #"spot_start_datetime":"datetime64[ns]", + "spot_duration":"int64", + "preceding_programme_name":"string", + "succeeding_programme_name":"string", + "break_type":"string", + "position_in_break":"string", + "broadcaster_spot_number":"string", + "commercial_number":"string", + "clearcast_commercial_title":"string", + "clearcast_match_group_code":"string", + "clearcast_match_group_name":"string", + "clearcast_buyer_code":"string", + "clearcast_buyer_name":"string", + "clearcast_advertiser_code":"string", + "clearcast_advertiser_name":"string", + "campaign_approval_id":"string", + "sales_house_name":"string", + #"audience_views":"object", + #"date_of_transmission":"datetime64[ns]" + } + dataframe=dataframe.astype(dtype=dtypes_dict) + return dataframe class AudiencesByTimeResultSet(APIResultSet): """ Represents an audiences by time result set from the Barb API. """ - def to_dataframe(self): + def to_dataframe(self) -> pd.DataFrame: """ Converts the API response data into a pandas dataframe. @@ -936,161 +1249,300 @@ def to_dataframe(self): """ - if len(self.api_response_data["events"]) == 0: - raise Exception(f"Error: No events returned.") + # if len(self.api_response_data.get("events",[]) == 0: + # raise Exception("Error: No events returned.") try: # Loop through the events and then the audiences within the events audience_data = [] - for e in self.api_response_data["events"]: - for v in e["audience_views"]: + for e in self.api_response_data.get("events",[]): + e:dict + for v in e.get("audience_views",[]): + v:dict audience_data.append( { - "panel_region": e["panel"]["panel_region"], - "station_name": e["station"]["station_name"], - "date_of_transmission": e["date_of_transmission"], - "activity": e["activity"], - "transmission_time_period_start": e[ - "transmission_time_period_start" - ]["standard_datetime"], - "audience_name": v["description"], - "audience_size_hundreds": v["audience_size_hundreds"], - "audience_target_size_hundreds": v[ + "date_of_transmission": e.get("date_of_transmission"), + "panel_code": e.get("panel",{}).get("panel_code"), + "panel_region": e.get("panel",{}).get("panel_region"), + "is_macro_region": e.get("panel",{}).get("is_macro_region"), + "station_code": e.get("station",{}).get("station_code"), + "station_name": e.get("station",{}).get("station_name"), + "activity": e.get("activity"), + "transmission_time_period_duration_mins": e.get( + "transmission_time_period_duration_mins"), + "transmission_time_period_start": e.get( + "transmission_time_period_start",{} + ).get("standard_datetime"), + "platforms": e.get("platforms"), + "audience_code": v.get("audience_code"), + "audience_size_hundreds": v.get("audience_size_hundreds"), + "category_id": v.get("category_id"), + "audience_name": v.get("description"), + "audience_target_size_hundreds": v.get( "target_size_in_hundreds" - ], + ), } ) # Convert the result into a data frame - - audience_data = pd.DataFrame(audience_data) + columns_dict={ + "date_of_transmission": "datetime64[ns]", + "panel_code": "string", + "panel_region": "string", + "is_macro_region": "bool", + "station_code": "string", + "station_name": "string", + "activity": "string", + "transmission_time_period_duration_mins": "int64", + "transmission_time_period_start": "datetime64[ns]", + "platforms": "object", + "audience_code": "string", + "audience_size_hundreds": "int64", + "category_id": "string", + "audience_name": "string", + "audience_target_size_hundreds": "int64", + } + #verify dtypes + if len(audience_data)>0: + verifry_class = VerifyDtypes() + audience_data = verifry_class.verify_dtypes( + data = audience_data + ,column_dtypes = columns_dict + ) + audience_data = pd.DataFrame(audience_data, columns=list(columns_dict.keys())) # Format the transmission_time_period as a pandas datetime - audience_data["transmission_time_period_start"] = pd.to_datetime( - audience_data["transmission_time_period_start"] - ) + if not audience_data.empty: + audience_data["transmission_time_period_start"] = pd.to_datetime( + audience_data["transmission_time_period_start"] + ) + audience_data = audience_data.astype(dtype=columns_dict) return audience_data - except: - print( - "Failed to convert the API response data into a dataframe. Check the API response data for anomalies" - ) - - -class ViewingResultSet(APIResultSet): - def __init__(self, api_response_data): - """ - Initialises a new instance of the APIResultSet class. - - Args: - api_response_data (dict): The API response data. - """ - - if api_response_data.shape[0] == 0: - raise Exception(f"Error: No events returned.") - - try: - bool_columns = ["TARGETED_PROMOTION", "SKY_ULTRA_HD"] - api_response_data[bool_columns] = api_response_data[bool_columns].astype( - bool - ) - - json_columns = [ - "SESSION_START", - "SESSION_END", - "HOUSEHOLD", - "DEVICE", - "PANEL_VIEWERS", - "GUEST_VIEWERS", - "PROGRAMMES_VIEWED", - "SPOTS_VIEWED", - "PANEL", - "VIEWING_STATION", - "START_OF_RECORDING", - "VOD_PROVIDER", - ] - - for column in json_columns: - api_response_data[column] = api_response_data[column].apply(json.loads) - - self.api_response_data = api_response_data - - except: - print( - "Failed to decode the nested json data. Check the API response data for anomalies" - ) - - def to_dataframe(self, unpack=None): + except pd.errors.DataError as error: + print(error) + return None + +class GoogleBucketResultSet(BulkResultSet): + """Represents spot Audience result set from the Barb API""" + + def __init__(self, api_response_data: list[dict], endpoint:str): + """initalise class""" + super().__init__(api_response_data) + self.endpoint =endpoint + + def download_parquet(self) -> None: + """download the parquet""" + print("response_data",self.api_response_data) + for results_item in self.api_response_data: + print("results_item",results_item) + date = results_item.get("session_date") + panel = results_item.get("panel_code") + for i,link in enumerate(results_item.get("results",[])): + print(link) + with tempfile.TemporaryDirectory() as td: + r=requests.get(link,timeout=300) + r.raise_for_status() + print(r.headers) + with open(f"{td}/demo_{i}.parquet",mode="wb") as f: + f.write(r.content) + print(f.name) + self.parquet_to_google_bucket( + file=f"{f.name}" + ,date=date + ,panel_code=panel + ) + return "ok" + + def parquet_to_google_bucket(self, file:str, date:str, panel_code:int) -> None: + """upload the parquet to the google bucket""" + bucket_name = os.getenv("BUCKET") + bucket = storage_client.get_bucket(bucket_name) + bucket = storage_client.bucket(bucket_name) + result = re.search("(demo_)[0-9]*.parquet$",file) + file_name= result[0] + blob = bucket.blob(f"{self.endpoint}/date={date}/panel_code={panel_code}/{file_name}") + blob.upload_from_filename(filename=file) + print("file uploaded") + return "ok" + +class SpotScheduleResultSet(BulkResultSet): + """ + return dataframe + """ + def to_dataframe(self) ->pd.DataFrame: """ Converts the API response data into a pandas dataframe. - Args: - unpack (list): The columns to unpack - Returns: pandas.DataFrame: A dataframe containing the API response data. - """ - try: - if set(unpack) == set(["viewers", "programmes"]): - data_as_dict = self.api_response_data.to_dict(orient="records") - rows = [] - for item in data_as_dict: - row = {} - row.update(item["HOUSEHOLD"]) - row.update(item["DEVICE"]) - - for programme in item["PROGRAMMES_VIEWED"]: - for viewer in item["PANEL_VIEWERS"]: - inner_row = {} - inner_row.update( - { - "session_start_datetime": item["SESSION_START"][ - "standard_datetime" - ] - } - ) - if "programme_start_datetime" in programme.keys(): - inner_row.update( - { - "programme_start_datetime": programme[ - "programme_start_datetime" - ]["standard_datetime"] - } - ) - inner_row.update( - { - "programme_name": programme["programme_content"][ - "content_name" - ] - } - ) - inner_row.update(viewer) - inner_row.update(row) - rows.append(inner_row) - - # Drop all columns from df with datatype that is a dict - - df = pd.DataFrame(rows) - - # If it exists, drop the column tv_set_properties - for column in ["tv_set_properties", "panel_member_weights"]: - if column in df.columns: - df = df.drop(columns=[column]) - - df = df.drop_duplicates() - - return df - except: - print( - "Failed to convert the API response data into a dataframe. Check the API response data for anomalies" - ) - - def to_json(self, file_name): """ - Converts the API response data into a json object. - Returns: - json: A json containing the API response data. - """ + #if len(self.api_response_data["events"]) == 0: + # raise Exception("Error: No events returned.") + + try: + # Loop through the events and then the audiences within the events + spot_data = [] + #print(self.api_response_data) + for e in self.api_response_data: + for item in e.get("spot_schedule",[{}]): + item:dict + platforms=[] + #print("platform type:",type(item.get("platforms",[]))) + for platfrom in item.get("platforms",[]): + platforms.append(str(platfrom)) + spot_data.append( + { + "scheduled_date":e.get("scheduled_date"), + "station_code":str(e.get("station",{}).get("station_code",{})), + "station_name":str(e.get("station",{}).get("station_name",{})), + "panel_code":str(e.get("panel",{}).get("panel_code",{})), + "panel_region":str(e.get("panel",{}).get("panel_region",{})), + "is_macro_region":str(e.get("panel",{}).get("is_macro_region",{})), + "break_type": str(item.get("break_type")), + "broadcaster_spot_number": str(item.get("broadcaster_spot_number")), + "campaign_approval_id": str(item.get("campaign_approval_id")), + "match_group_code": str(item["clearcast_information"].get( + "match_group_code")), + "match_group_name": str(item["clearcast_information"].get( + "match_group_name")), + "buyer_code": str(item["clearcast_information"].get("buyer_code")), + "buyer_name": str(item["clearcast_information"].get("buyer_name")), + "advertiser_code": str(item["clearcast_information"].get( + "advertiser_code")), + "advertiser_name": str(item["clearcast_information"].get( + "advertiser_name")), + "holding_company_code": str(item["clearcast_information"].get( + "holding_company_code")), + "holding_company_name": str(item["clearcast_information"].get( + "holding_company_name")), + "product_code": str(item["clearcast_information"].get( + "product_code")), + "product_name": str(item["clearcast_information"].get( + "product_name")), + "clearcast_commercial_title": str(item["clearcast_information"].get( + "clearcast_commercial_title")), + "commercial_spot_length": str(item["clearcast_information"].get( + "commercial_spot_length")), + "clearcast_web_address": str(item["clearcast_information"].get( + "clearcast_web_address")), + "commercial_number": str(item.get("commercial_number")), + "platforms": json.dumps(platforms), + "position_in_break": str(item.get("position_in_break")), + "preceeding_programme_name": str(item.get("preceeding_programme_name")), + # "sales_house":{ + "sales_house_name": str(item["sales_house"].get( + "sales_house_name")), + "sales_house_brand_description": str(item["sales_house"].get( + "sales_house_brand_description")), + # }, + "spot_duration": str(item.get("spot_duration")), + # "spot_start_datetime": { + "barb_reporting_datetime": item["spot_start_datetime"].get( + "barb_reporting_datetime"), + "barb_polling_datetime": item["spot_start_datetime"].get( + "barb_polling_datetime"), + "standard_datetime": item["spot_start_datetime"].get( + "standard_datetime"), + # }, + "spot_type": str(item.get("spot_type")), + "succeeding_programme_name": str(item.get("succeeding_programme_name")) + } + ) + # Convert the result into a data frame + columns_dict={ + "scheduled_date":"datetime64[ns]", + # "station", + # "panel", + # "spot_schedule" + "station_code":"string", + "station_name":"string", + "panel_code":"string", + "panel_region":"string", + "is_macro_region":"string", + "break_type":"string", + "broadcaster_spot_number":"string", + "campaign_approval_id":"string", + "match_group_code":"string", + "match_group_name":"string", + "buyer_code":"string", + "buyer_name":"string", + "advertiser_code":"string", + "advertiser_name":"string", + "holding_company_code":"string", + "holding_company_name":"string", + "product_code":"string", + "product_name":"string", + "clearcast_commercial_title":"string", + "commercial_spot_length":"string", + "clearcast_web_address":"string", + "commercial_number":"string", + "platforms":"string", + "position_in_break":"string", + "preceeding_programme_name":"string", + # "sales_house":{ + "sales_house_name":"string", + "sales_house_brand_description":"string", + # }, + "spot_duration":"string", + # "spot_start_datetime": { + "barb_reporting_datetime":"string", + "barb_polling_datetime":"string", + "standard_datetime":"datetime64[ns]", + # }, + "spot_type":"string", + "succeeding_programme_name":"string" + } + #verify dtypes + if len(spot_data)>0: + verifry_class = VerifyDtypes() + verifry_class.verify_dtypes( + data=spot_data + ,column_dtypes=columns_dict + ) + spot_data_frame = pd.DataFrame(data=spot_data, columns=list(columns_dict.keys())) + print("data frame:",spot_data_frame) - self.api_response_data.to_json(file_name, orient="records") + if not spot_data_frame.empty: + # Format the transmission_time_period as a pandas datetime + spot_data_frame["scheduled_date"] = pd.to_datetime( + spot_data_frame["scheduled_date"] + ) + #set dtypes + spot_data_frame=spot_data_frame.astype(dtype=columns_dict) + return spot_data_frame + except pd.errors.DataError as error: + print(error) + raise pd.errors.DataError( + error + ) from error + +class VerifyDtypes: + """schema verification""" + + def verify_dtypes(self,data:list[dict],column_dtypes:dict) -> list[dict]: + """process of verifiying and changing data""" + for item,value in enumerate(data): + value:dict + for column, dtype in column_dtypes.items(): + column:str + dtype:str + series = pd.Series(data={"column":value.get(column)},index=["column"]) + if dtype.lower()=="datetime64[ns]": + pd.to_datetime(series) + elif (dtype.lower()=="string" and not( + isinstance(type(value.get(column)),str))): + data[item][column] = None + elif dtype.lower()=="int64" and not( + isinstance(type(value.get(column)),int)): + data[item][column] = 0 + elif dtype.lower()=="bool" and not( + isinstance(type(value.get(column)),bool)): + data[item][column] = False + elif dtype.lower()=="object" and not( + isinstance(type(value.get(column)),list)): + data[item][column]=[] + else: + raise TypeError(f"unknown type {dtype} for '{column}'") + return data