Skip to content

Commit

Permalink
Refacto/import api by program (#164)
Browse files Browse the repository at this point in the history
* wip

* wip

* refacto: import only specific programs

* poetry lock
  • Loading branch information
polomarcus authored Apr 25, 2024
1 parent 48baa18 commit 5afdf33
Show file tree
Hide file tree
Showing 17 changed files with 319 additions and 219 deletions.
5 changes: 2 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ services:
PORT: 5050 # healthcheck
HEALTHCHECK_SERVER: "0.0.0.0"
# SENTRY_DSN: prod_only
UPDATE: "true" # to batch update PG
UPDATE_PROGRAM_ONLY: "true" # to batch update PG but only channel with program
#UPDATE: "true" # to batch update PG
#UPDATE_PROGRAM_ONLY: "true" # to batch update PG but only channel with program
# START_OFFSET: 100 # to batch update PG from a offset
# START_DATE: 1704576615 # to test batch import
CHANNEL : france2 # to reimport only one channel
Expand All @@ -124,7 +124,6 @@ services:
KEYWORDS_URL: https://keywords.mediatree.fr/api/subtitle/ # https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
MODIN_ENGINE: ray
MODIN_CPUS: 4 # "https://modin.readthedocs.io/en/0.11.0/using_modin.html#reducing-or-limiting-the-resources-modin-can-use"
TZ: 'Europe/Paris'
volumes:
- ./quotaclimat/:/app/quotaclimat/
- ./postgres/:/app/postgres/
Expand Down
135 changes: 68 additions & 67 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ tenacity = "^8.2.3"
sentry-sdk = "^1.44.1"
coverage = "^7.4.2"
modin = {extras = ["ray"], version = "^0.29.0"}
dask-expr = "^1.0.12"

[build-system]
requires = ["poetry-core>=1.1"]
Expand Down
87 changes: 43 additions & 44 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,31 @@ async def get_and_save_api_data(exit_event):
token=get_auth_token(password=password, user_name=USER)
type_sub = 's2t'

(start_date_to_query, end_epoch) = get_start_end_date_env_variable_with_default()

(start_date_to_query, end_date) = get_start_end_date_env_variable_with_default()
df_programs = get_programs()
channels = get_channels()

range = get_date_range(start_date_to_query, end_epoch)
logging.info(f"Number of date to query : {len(range)}")
for date in range:
token = refresh_token(token, date)
day_range = get_date_range(start_date_to_query, end_date)
logging.info(f"Number of days to query : {len(day_range)} - day_range : {day_range}")
for day in day_range:
token = refresh_token(token, day)

date_epoch = get_epoch_from_datetime(date)
logging.info(f"Date: {date} - {date_epoch}")
for channel in channels :
for channel in channels:
try:
df = extract_api_sub(token, channel, type_sub, date_epoch)
if(df is not None):
# must ._to_pandas() because modin to_sql is not working
save_to_pg(df._to_pandas(), keywords_table, conn)
else:
logging.info("Nothing to save to Postgresql")
programs_for_this_day = get_programs_for_this_day(day, channel, df_programs)

for index, program in programs_for_this_day.iterrows():
start_epoch = program['start']
end_epoch = program['end']
channel_program = program['program_name']
channel_program_type = program['program_type']
logging.info(f"Querying API for {channel} - {channel_program} - {channel_program_type} - {start_epoch} - {end_epoch}")
df = extract_api_sub(token, channel, type_sub, start_epoch,end_epoch, channel_program,channel_program_type)
if(df is not None):
# must ._to_pandas() because modin to_sql is not working
save_to_pg(df._to_pandas(), keywords_table, conn)
else:
logging.info("Nothing to save to Postgresql")
except Exception as err:
logging.error(f"continuing loop but met error : {err}")
continue
Expand All @@ -126,32 +132,28 @@ def get_auth_token(password=password, user_name=USER):
except Exception as err:
logging.error("Could not get token %s:(%s) %s" % (type(err).__name__, err))

# @TODO filter by keyword when api is updated (to filter first on the API side instead of filter_and_tag_by_theme )
# see : https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
def get_param_api(token, type_sub, start_epoch, channel = 'm6', page = 0):
number_of_hours = get_hour_frequency()
one_hour = 3600
def get_param_api(token, type_sub, start_epoch, channel, end_epoch):
epoch_5min_margin = 300
return {
"channel": channel,
"token": token,
"start_gte": start_epoch,
"start_lte": start_epoch + (one_hour * number_of_hours), # Start date lower or equal
"start_gte": int(start_epoch) - epoch_5min_margin,
"start_lte": int(end_epoch) + epoch_5min_margin,
"type": type_sub,
"size": "1000", # range 1-1000
# "from": page TODO fix me
"size": "1000" # range 1-1000
}


# "Randomly wait up to 2^x * 1 seconds between each retry until the range reaches 60 seconds, then randomly up to 60 seconds afterwards"
# @see https://github.com/jd/tenacity/tree/main
@retry(wait=wait_random_exponential(multiplier=1, max=60),stop=stop_after_attempt(7))
def get_post_request(media_tree_token, type_sub, start_epoch, channel, page: int = 0):
def get_post_request(media_tree_token, type_sub, start_epoch, channel, end_epoch):
try:
params = get_param_api(media_tree_token, type_sub, start_epoch, channel, page)
logging.info(f"Query {KEYWORDS_URL} with params:\n {get_param_api('fake_token_for_log', type_sub, start_epoch, channel, page)}")
params = get_param_api(media_tree_token, type_sub, start_epoch, channel, end_epoch)
logging.info(f"Query {KEYWORDS_URL} with params:\n {get_param_api('fake_token_for_log', type_sub, start_epoch, channel, end_epoch)}")
response = requests.post(KEYWORDS_URL, json=params)
if response.status_code == 401:
logging.warning(f"401 - Expired token - retrying to get a new one {response.content}")
if response.status_code >= 400:
logging.warning(f"{response.status_code} - Expired token ? - retrying to get a new one {response.content}")
media_tree_token = get_auth_token(password, USER)
raise Exception

Expand All @@ -160,19 +162,12 @@ def get_post_request(media_tree_token, type_sub, start_epoch, channel, page: int
logging.error("Retry - Could not query API :(%s) %s" % (type(err).__name__, err))
raise Exception

# use API pagination to be sure to query all data from a date
def get_number_of_page_by_channel(media_tree_token, type_sub, start_epoch, channel) -> int:
logging.info("get how many pages we have to parse from the API (number_pages)")
response_sub = get_post_request(media_tree_token, type_sub, start_epoch, channel)

return parse_number_pages(response_sub)

@retry(wait=wait_random_exponential(multiplier=1, max=60),stop=stop_after_attempt(7))
def get_df_api(media_tree_token, type_sub, start_epoch, channel, page):
def get_df_api(media_tree_token, type_sub, start_epoch, channel, end_epoch, channel_program, channel_program_type):
try:
response_sub = get_post_request(media_tree_token, type_sub, start_epoch, channel, page)
response_sub = get_post_request(media_tree_token, type_sub, start_epoch, channel, end_epoch)

return parse_reponse_subtitle(response_sub, channel)
return parse_reponse_subtitle(response_sub, channel, channel_program, channel_program_type)
except Exception as err:
logging.error("Retry - get_df_api:(%s) %s" % (type(err).__name__, err))
raise Exception
Expand All @@ -184,15 +179,16 @@ def extract_api_sub(
channel = 'm6',
type_sub = "s2t",
start_epoch = None,
page = 0
end_epoch = None
,channel_program: str = ""
,channel_program_type : str = ""
) -> Optional[pd.DataFrame]:
try:
df = get_df_api(media_tree_token, type_sub, start_epoch, channel, page)
df = get_df_api(media_tree_token, type_sub, start_epoch, channel, end_epoch, channel_program, channel_program_type)

if(df is not None):
df = filter_and_tag_by_theme(df)
df["id"] = add_primary_key(df)
df = add_channel_program(df)
return df
else:
None
Expand All @@ -213,7 +209,7 @@ def parse_total_results(response_sub) -> int :
def parse_number_pages(response_sub) -> int :
return int(response_sub.get('number_pages'))

def parse_reponse_subtitle(response_sub, channel = None) -> Optional[pd.DataFrame]:
def parse_reponse_subtitle(response_sub, channel = None, channel_program = "", channel_program_type = "") -> Optional[pd.DataFrame]:
with sentry_sdk.start_transaction(op="task", name="parse_reponse_subtitle"):
total_results = parse_total_results(response_sub)
logging.getLogger("modin.logging.default").setLevel(logging.WARNING)
Expand All @@ -224,11 +220,14 @@ def parse_reponse_subtitle(response_sub, channel = None) -> Optional[pd.DataFram
logging.debug("Schema from API before formatting :\n%s", new_df.dtypes)
new_df.drop('channel.title', axis=1, inplace=True) # keep only channel.name

new_df['timestamp'] = pd.to_datetime(new_df['start'], unit='s', utc=True).dt.tz_convert('Europe/Paris')
new_df['timestamp'] = pd.to_datetime(new_df['start'], unit='s', utc=True)
new_df.drop('start', axis=1, inplace=True) # keep only channel.name

new_df.rename(columns={'channel.name':'channel_name', 'channel.radio': 'channel_radio', 'timestamp':'start'}, inplace=True)

new_df['channel_program'] = channel_program
new_df['channel_program_type'] = channel_program_type

log_dataframe_size(new_df, channel)

logging.debug("Parsed Schema\n%s", new_df.dtypes)
Expand Down
67 changes: 53 additions & 14 deletions quotaclimat/data_processing/mediatree/channel_program.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import modin.pandas as pd
import logging
import os
from datetime import datetime

from quotaclimat.data_processing.mediatree.utils import get_epoch_from_datetime

def format_hour_minute(time: str) -> pd.Timestamp:
date_str = "1970-01-01"
return pd.to_datetime(date_str + " " + time)

def get_programs():
logging.debug("Getting program tv/radio...")
Expand All @@ -9,8 +16,8 @@ def get_programs():
json_file_path = os.path.join(current_dir, 'channel_program.json')
df_programs = pd.read_json(json_file_path, lines=True)

df_programs['start'] = pd.to_datetime(df_programs['start'], format='%H:%M').dt.tz_localize('Europe/Paris')
df_programs['end'] = pd.to_datetime(df_programs['end'], format='%H:%M').dt.tz_localize('Europe/Paris')
df_programs['start'] = format_hour_minute(df_programs['start'])
df_programs['end'] = format_hour_minute(df_programs['end'])
except (Exception) as error:
logging.error("Could not read channel_program.json", error)
raise Exception
Expand Down Expand Up @@ -44,7 +51,7 @@ def compare_weekday(df_program_weekday, start_weekday: int) -> bool:

def get_hour_minute(time: pd.Timestamp):
# http://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.tz_localize.html
start_time = pd.to_datetime(time.strftime("%H:%M"), format="%H:%M").tz_localize('Europe/Paris')
start_time = format_hour_minute(time.strftime("%H:%M"))
logging.debug(f"time was {time} now is start_time subtitle {start_time}")

return start_time
Expand All @@ -55,37 +62,69 @@ def get_day_of_week(time: pd.Timestamp):
logging.debug(f"start_weekday subtitle {start_weekday}")
return start_weekday

def get_program_with_start_timestamp(df_program: pd.DataFrame, start_time: pd.Timestamp, channel_name: str):
def get_matching_program_hour(df_program: pd.DataFrame, start_time: pd.Timestamp):
start_time = get_hour_minute(start_time)
return df_program[
(df_program['start'] <= start_time) &
(df_program['end'] > start_time) # stricly > to avoid overlapping programs
]

def get_matching_program_weekday(df_program: pd.DataFrame, start_time: pd.Timestamp, channel_name: str):
start_weekday = get_day_of_week(start_time)

df_program["weekday_mask"] = df_program['weekday'].apply(
lambda x: compare_weekday(x, start_weekday)
)

)
matching_rows = df_program[
(df_program['channel_name'] == channel_name) &
df_program["weekday_mask"] &
(df_program['start'] <= start_time) &
(df_program['end'] > start_time) # stricly > to avoid overlapping programs
df_program["weekday_mask"] == True
]
matching_rows.drop(columns=['weekday_mask'], inplace=True)
matching_rows.drop(columns=['weekday'], inplace=True)

if matching_rows.empty:
logging.warn(f"Program tv : no matching rows found {channel_name} for weekday {start_weekday} - {start_time}")

return matching_rows

def get_a_program_with_start_timestamp(df_program: pd.DataFrame, start_time: pd.Timestamp, channel_name: str):
matching_rows = get_matching_program_weekday(df_program, start_time, channel_name)
matching_rows = get_matching_program_hour(matching_rows, start_time)

if(len(matching_rows) > 1):
logging.error(f"Several programs name for the same channel and time {channel_name} and {start_time} / weekday {start_weekday} - {matching_rows}")

logging.error(f"Several programs name for the same channel and time {channel_name} and {start_time} - {matching_rows}")
if not matching_rows.empty:
logging.debug(f"matching_rows {matching_rows}")
return matching_rows.iloc[0]['program_name'], matching_rows.iloc[0]['program_type']
else:
logging.warn(f"Program tv : no matching rows found {channel_name} for weekday {start_weekday} - {start_time}")
logging.info(f"no programs found for {channel_name} - {start_time}")
return "", ""

def process_subtitle(row, df_program):
channel_program, channel_program_type = get_program_with_start_timestamp(df_program, row['start'], row['channel_name'])
channel_program, channel_program_type = get_a_program_with_start_timestamp(df_program, row['start'], row['channel_name'])
row['channel_program'] = channel_program
row['channel_program_type'] = channel_program_type
return row

def merge_program_subtitle(df_subtitle: pd.DataFrame, df_program: pd.DataFrame):
merged_df = df_subtitle.apply(lambda subtitle : process_subtitle(subtitle, df_program), axis=1)

return merged_df
return merged_df

def set_day_with_hour(programs_of_a_day, day: datetime):
programs_of_a_day['start'] = programs_of_a_day['start'].apply(lambda dt: dt.replace(year=day.year, month=day.month, day=day.day))
programs_of_a_day['end'] = programs_of_a_day['end'].apply(lambda dt: dt.replace(year=day.year, month=day.month, day=day.day))
return programs_of_a_day

def get_programs_for_this_day(day: datetime, channel_name: str, df_program: pd.DataFrame):
start_time = pd.Timestamp(day)

programs_of_a_day = get_matching_program_weekday(df_program, start_time, channel_name)
programs_of_a_day = set_day_with_hour(programs_of_a_day, day)

programs_of_a_day[['start', 'end']] = programs_of_a_day.apply(lambda row: pd.Series({
'start': get_epoch_from_datetime(row['start'].tz_localize("Europe/Paris")),
'end': get_epoch_from_datetime(row['end'].tz_localize("Europe/Paris"))
}), axis=1)
logging.info(f"Program of {channel_name} : {programs_of_a_day}")
return programs_of_a_day
2 changes: 1 addition & 1 deletion quotaclimat/data_processing/mediatree/detect_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def filter_keyword_with_same_timestamp(keywords_with_timestamp: List[dict])-> Li
final_result = len(result)

if final_result < number_of_keywords:
logging.info(f"Filtering keywords {final_result} out of {number_of_keywords} | {keywords_with_timestamp} with final result {result}")
logging.debug(f"Filtering keywords {final_result} out of {number_of_keywords} | {keywords_with_timestamp} with final result {result}")

return result

Expand Down
9 changes: 4 additions & 5 deletions quotaclimat/data_processing/mediatree/update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from sqlalchemy.orm import Session
from postgres.schemas.models import Keywords
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.channel_program import get_programs, get_program_with_start_timestamp
from quotaclimat.data_processing.mediatree.channel_program import get_programs, get_a_program_with_start_timestamp
from sqlalchemy import func, select, delete

def update_keywords(session: Session, batch_size: int = 50000, start_offset : int = 0, program_only=False) -> list:
Expand All @@ -19,7 +19,7 @@ def update_keywords(session: Session, batch_size: int = 50000, start_offset : in
current_batch_saved_keywords = get_keywords_columns(session, i, batch_size)
logging.info(f"Updating {len(current_batch_saved_keywords)} elements from {i} offsets - batch size {batch_size}")
for keyword_id, plaintext, keywords_with_timestamp, number_of_keywords, start, srt, theme, channel_name in current_batch_saved_keywords:
program_name, program_name_type = get_program_with_start_timestamp(df_programs, start, channel_name)
program_name, program_name_type = get_a_program_with_start_timestamp(df_programs, pd.Timestamp(start).tz_convert('Europe/Paris'), channel_name)

if(not program_only):
try:
Expand Down Expand Up @@ -93,7 +93,7 @@ def get_keywords_columns(session: Session, page: int = 0, batch_size: int = 5000
Keywords.plaintext,
Keywords.keywords_with_timestamp,
Keywords.number_of_keywords,
func.timezone('UTC', Keywords.start).label('start'), # Stored as Europe/Pris inside PG
func.timezone('UTC', Keywords.start).label('start'),
Keywords.srt,
Keywords.theme,
Keywords.channel_name,
Expand Down Expand Up @@ -158,8 +158,7 @@ def update_keyword_row(session: Session,
def update_keyword_row_program(session: Session,
keyword_id: int,
channel_program: str,
channel_program_type: str,
):
channel_program_type: str):
session.query(Keywords).filter(Keywords.id == keyword_id).update(
{
Keywords.channel_program: channel_program,
Expand Down
Loading

1 comment on commit 5afdf33

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py44784%36–38, 57–59, 64
   insert_existing_data_example.py19384%25–27
postgres/schemas
   models.py1101586%101–108, 119–120, 152–162
quotaclimat/data_ingestion
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py553733%21–42, 45–58, 62–73
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py18411040%42–46, 51–63, 67–70, 76, 79–112, 118–133, 137–138, 151–163, 167–173, 186–197, 200–204, 210, 237–238, 242, 246–265, 268–270
   channel_program.py91990%21–23, 34–36, 50, 86, 95
   config.py15287%7, 16
   detect_keywords.py193498%197, 250–252
   update_pg_keywords.py44589%41–43, 78, 162
   utils.py662365%18, 29–53, 56, 65, 81–82
quotaclimat/utils
   healthcheck_config.py291452%22–24, 27–38
   logger.py241154%22–24, 28–37
   sentry.py10280%21–22
TOTAL108026276% 

Tests Skipped Failures Errors Time
80 0 💤 0 ❌ 0 🔥 1m 29s ⏱️

Please sign in to comment.