Skip to content

Commit

Permalink
Feat/number of keywords (#104)
Browse files Browse the repository at this point in the history
* feat: overlap number_of_keywords

* includes distance time keyword into parsing

* wip: update keywords saved in pg based on new logic

* autoreview

* docker compose
  • Loading branch information
polomarcus authored Feb 15, 2024
1 parent 18710cc commit 449d562
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 130 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ docker compose up mediatree
Use env variable `START_DATE` like in docker compose (epoch second format : 1705409797).

Otherwise, default is yesterday midnight date.

### Batch update
In case we have a new word detection logic, we must re apply it to all saved keywords inside our database.

We should use env variable `UPDATE` like in docker compose (should be set to "true")
### Fix linting
Before committing, make sure that the line of codes you wrote are conform to PEP8 standard by running:
```bash
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ services:
PORT: 5050 # healthcheck
HEALTHCHECK_SERVER: "0.0.0.0"
# START_DATE: 1704576615 # to test batch import
# UPDATE: "true" # to batch update PG
MEDIATREE_USER : /run/secrets/username_api
MEDIATREE_PASSWORD: /run/secrets/pwd_api
MEDIATREE_AUTH_URL: https://keywords.mediatree.fr/api/auth/token/
Expand Down
136 changes: 24 additions & 112 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@
### Library imports
import requests
import pandas as pd
import datetime
import json

import logging
import asyncio
from utils import *
import time
import sys
import os
from quotaclimat.utils.healthcheck_config import run_health_check_server
from quotaclimat.utils.logger import CustomFormatter
from quotaclimat.data_processing.mediatree.utils import *
from quotaclimat.data_processing.mediatree.config import *
from quotaclimat.data_processing.mediatree.update_pg_keywords import *
from quotaclimat.data_processing.mediatree.detect_keywords import *
from postgres.insert_data import save_to_pg
from postgres.schemas.models import create_tables, connect_to_db
from postgres.schemas.models import create_tables, connect_to_db, get_db_session
from postgres.schemas.models import keywords_table
from pandas import json_normalize
from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from typing import List, Optional
from quotaclimat.data_ingestion.scrap_sitemap import get_consistent_hash
import re
import swifter
from tenacity import *

#read whole file to a string
password = os.environ.get("MEDIATREE_PASSWORD")
if(password == '/run/secrets/pwd_api'):
password= open("/run/secrets/pwd_api", "r").read()
AUTH_URL = os.environ.get("MEDIATREE_AUTH_URL") #
USER = os.environ.get("MEDIATREE_USER")
if(USER == '/run/secrets/username_api'):
USER=open("/run/secrets/username_api", "r").read()
KEYWORDS_URL = os.environ.get("KEYWORDS_URL") #https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
password = get_password()
AUTH_URL = get_auth_url()
USER = get_user()
KEYWORDS_URL = get_keywords_url()

def refresh_token(token, date):
if is_it_tuesday(date): # refresh token every weekday for batch import
Expand All @@ -40,6 +35,14 @@ def refresh_token(token, date):
else:
return token

# reapply word detector logic to all saved keywords
# use when word detection is changed
async def update_pg_data(exit_event):
logging.info("Updating already saved data from Postgresql")
session = get_db_session()
update_keywords(session)
exit_event.set()

async def get_and_save_api_data(exit_event):
conn = connect_to_db()
token=get_auth_token(password=password, user_name=USER)
Expand Down Expand Up @@ -120,102 +123,6 @@ def get_theme_query_includes(theme_dict):
def transform_theme_query_includes(themes_with_keywords = THEME_KEYWORDS):
return list(map(get_theme_query_includes, themes_with_keywords))

def get_cts_in_ms_for_keywords(subtitle_duration: List[dict], keywords: List[str], theme: str) -> List[dict]:
result = []

logging.debug(f"Looking for timecode for {keywords}")
for multiple_keyword in keywords:
all_keywords = multiple_keyword.split() # case with multiple words such as 'économie circulaire'
match = next((item for item in subtitle_duration if is_word_in_sentence(all_keywords[0], item.get('text'))), None)
logging.debug(f"match found {match} with {all_keywords[0].lower()}")
if match is not None:
logging.debug(f'Result added due to this match {match} based on {all_keywords[0]}')
result.append(
{
"keyword" :multiple_keyword.lower(),
"timestamp" : match['cts_in_ms'],
"theme" : theme
})

logging.debug(f"Timecode found {result}")
return result

# be able to detect singular or plural for a word
def format_word_regex(word: str) -> str:
word = word.replace('\'', '\' ?') # case for d'eau -> d' eau
if not word.endswith('s') and not word.endswith('x') and not word.endswith('à'):
return word + "s?"
elif word.endswith('s'):
return word + '?'
else:
return word

def is_word_in_sentence(words: str, sentence: str) -> bool :
# words can contain plurals and several words
words = ' '.join(list(map(( lambda x: format_word_regex(x)), words.split(" "))))
logging.debug(f"testing {words}")
# test https://regex101.com/r/ilvs9G/1/
if re.search(rf"\b{words}(?![\w-])", sentence, re.IGNORECASE):
logging.debug(f"words {words} found in {sentence}")
return True
else:
return False

def get_themes_keywords_duration(plaintext: str, subtitle_duration: List[str]) -> List[Optional[List[str]]]:
matching_themes = []
keywords_with_timestamp = []

for theme, keywords in THEME_KEYWORDS.items():
logging.debug(f"searching {theme} for {keywords}")

matching_words = [word for word in keywords if is_word_in_sentence(word, plaintext)]
if matching_words:
logging.debug(f"theme found : {theme} with word {matching_words}")
matching_themes.append(theme)
# look for cts_in_ms inside matching_words (['économie circulaire', 'panneaux solaires', 'solaires'] from subtitle_duration
keywords_to_add = get_cts_in_ms_for_keywords(subtitle_duration, matching_words, theme)
if(len(keywords_to_add) == 0):
logging.warning(f"Check regex - Empty keywords but themes is there {theme} - matching_words {matching_words} - {subtitle_duration}")
keywords_with_timestamp.extend(keywords_to_add)

if len(matching_themes) > 0:
return [matching_themes, keywords_with_timestamp, int(len(keywords_with_timestamp))]
else:
return [None, None, None]

def log_min_max_date(df):
max_date = max(df['start'])
min_date = min(df['start'])
logging.info(f"Date min : {min_date}, max : {max_date}")

def filter_and_tag_by_theme(df: pd.DataFrame) -> pd.DataFrame :
count_before_filtering = len(df)
logging.info(f"{count_before_filtering} subtitles to filter by keywords and tag with themes")
log_min_max_date(df)

logging.info(f'tagging plaintext subtitle with keywords and theme : regexp - search taking time...')
# using swifter to speed up apply https://github.com/jmcarpenter2/swifter
df[['theme', u'keywords_with_timestamp', 'number_of_keywords']] = df[['plaintext','srt']].swifter.apply(lambda row: get_themes_keywords_duration(*row), axis=1, result_type='expand')

# remove all rows that does not have themes
df = df.dropna(subset=['theme'])

df.drop('srt', axis=1, inplace=True)

logging.info(f"After filtering with out keywords, we have {len(df)} out of {count_before_filtering} subtitles left that are insteresting for us")

return df

def add_primary_key(df):
logging.info("Adding primary key to save to PG and have idempotent result")
try:
return (
df["start"].astype(str) + df["channel_name"]
).apply(get_consistent_hash)
except (Exception) as error:
logging.error(error)
return get_consistent_hash("empty") # TODO improve - should be a None ?

# "Randomly wait up to 2^x * 1 seconds between each retry until the range reaches 60 seconds, then randomly up to 60 seconds afterwards"
# @see https://github.com/jd/tenacity/tree/main
@retry(wait=wait_random_exponential(multiplier=1, max=60),stop=stop_after_attempt(7))
Expand Down Expand Up @@ -314,7 +221,7 @@ def log_dataframe_size(df, channel):
logging.warning(f"High Dataframe size : {bytes_size / (1000 * 1000)}")
if(len(df) == 1000):
logging.error("We might lose data - df size is 1000 out of 1000 - we should divide this querry")

async def main():
logger.info("Start api mediatree import")
create_tables()
Expand All @@ -324,7 +231,10 @@ async def main():
health_check_task = asyncio.create_task(run_health_check_server())

# Start batch job
asyncio.create_task(get_and_save_api_data(event_finish))
if(os.environ.get("UPDATE") == "true"):
asyncio.create_task(update_pg_data(event_finish))
else:
asyncio.create_task(get_and_save_api_data(event_finish))

# Wait for both tasks to complete
await event_finish.wait()
Expand Down Expand Up @@ -354,3 +264,5 @@ async def main():

asyncio.run(main())
sys.exit(0)


21 changes: 21 additions & 0 deletions quotaclimat/data_processing/mediatree/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

#read whole file to a string
def get_password():
password = os.environ.get("MEDIATREE_PASSWORD")
if(password == '/run/secrets/pwd_api'):
password= open("/run/secrets/pwd_api", "r").read()
return password

def get_auth_url():
return os.environ.get("MEDIATREE_AUTH_URL") #

def get_user():
USER = os.environ.get("MEDIATREE_USER")
if(USER == '/run/secrets/username_api'):
USER=open("/run/secrets/username_api", "r").read()
return USER

#https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
def get_keywords_url():
return os.environ.get("KEYWORDS_URL")
131 changes: 131 additions & 0 deletions quotaclimat/data_processing/mediatree/detect_keywords.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import pandas as pd

import logging

from quotaclimat.data_processing.mediatree.utils import *
from quotaclimat.data_processing.mediatree.config import *
from postgres.schemas.models import keywords_table
from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from typing import List, Optional
from quotaclimat.data_ingestion.scrap_sitemap import get_consistent_hash
import re
import swifter

def get_cts_in_ms_for_keywords(subtitle_duration: List[dict], keywords: List[str], theme: str) -> List[dict]:
result = []

logging.debug(f"Looking for timecode for {keywords}")
for multiple_keyword in keywords:
all_keywords = multiple_keyword.split() # case with multiple words such as 'économie circulaire'
match = next((item for item in subtitle_duration if is_word_in_sentence(all_keywords[0], item.get('text'))), None)
logging.debug(f"match found {match} with {all_keywords[0].lower()}")
if match is not None:
logging.debug(f'Result added due to this match {match} based on {all_keywords[0]}')
result.append(
{
"keyword" :multiple_keyword.lower(),
"timestamp" : match['cts_in_ms'],
"theme" : theme
})

logging.debug(f"Timecode found {result}")
return result

# be able to detect singular or plural for a word
def format_word_regex(word: str) -> str:
word = word.replace('\'', '\' ?') # case for d'eau -> d' eau
if not word.endswith('s') and not word.endswith('x') and not word.endswith('à'):
return word + "s?"
elif word.endswith('s'):
return word + '?'
else:
return word

def is_word_in_sentence(words: str, sentence: str) -> bool :
# words can contain plurals and several words
words = ' '.join(list(map(( lambda x: format_word_regex(x)), words.split(" "))))
logging.debug(f"testing {words}")
# test https://regex101.com/r/ilvs9G/1/
if re.search(rf"\b{words}(?![\w-])", sentence, re.IGNORECASE):
logging.debug(f"words {words} found in {sentence}")
return True
else:
return False

def get_themes_keywords_duration(plaintext: str, subtitle_duration: List[str]) -> List[Optional[List[str]]]:
matching_themes = []
keywords_with_timestamp = []

for theme, keywords in THEME_KEYWORDS.items():
logging.debug(f"searching {theme} for {keywords}")

matching_words = [word for word in keywords if is_word_in_sentence(word, plaintext)]
if matching_words:
logging.debug(f"theme found : {theme} with word {matching_words}")
matching_themes.append(theme)
# look for cts_in_ms inside matching_words (['économie circulaire', 'panneaux solaires', 'solaires'] from subtitle_duration
keywords_to_add = get_cts_in_ms_for_keywords(subtitle_duration, matching_words, theme)
if(len(keywords_to_add) == 0):
logging.warning(f"Check regex - Empty keywords but themes is there {theme} - matching_words {matching_words} - {subtitle_duration}")
keywords_with_timestamp.extend(keywords_to_add)

if len(matching_themes) > 0:
return [matching_themes, keywords_with_timestamp, count_keywords_duration_overlap(keywords_with_timestamp)]
else:
return [None, None, None]

def log_min_max_date(df):
max_date = max(df['start'])
min_date = min(df['start'])
logging.info(f"Date min : {min_date}, max : {max_date}")

def filter_and_tag_by_theme(df: pd.DataFrame) -> pd.DataFrame :
count_before_filtering = len(df)
logging.info(f"{count_before_filtering} subtitles to filter by keywords and tag with themes")
log_min_max_date(df)

logging.info(f'tagging plaintext subtitle with keywords and theme : regexp - search taking time...')
# using swifter to speed up apply https://github.com/jmcarpenter2/swifter
df[['theme', u'keywords_with_timestamp', 'number_of_keywords']] = df[['plaintext','srt']].swifter.apply(lambda row: get_themes_keywords_duration(*row), axis=1, result_type='expand')

# remove all rows that does not have themes
df = df.dropna(subset=['theme'])

df.drop('srt', axis=1, inplace=True)

logging.info(f"After filtering with out keywords, we have {len(df)} out of {count_before_filtering} subtitles left that are insteresting for us")

return df

def add_primary_key(df):
logging.info("Adding primary key to save to PG and have idempotent result")
try:
return (
df["start"].astype(str) + df["channel_name"]
).apply(get_consistent_hash)
except (Exception) as error:
logging.error(error)
return get_consistent_hash("empty") # TODO improve - should be a None ?

def count_keywords_duration_overlap(keywords_with_timestamp: List[dict]) -> int:
if(len(keywords_with_timestamp)) <= 1:
return len(keywords_with_timestamp)
else:
# in case keywords are not in the right order
sorted_keywords = iter(sorted(keywords_with_timestamp, key=lambda x: x['timestamp']))

count = 1
previous_timestamp = next(sorted_keywords)['timestamp']

for keyword_info in sorted_keywords:
current_timestamp = keyword_info['timestamp']
overlap_time = current_timestamp - previous_timestamp

if is_time_distance_between_keyword_enough(overlap_time):
logging.debug(f"No overlapping keyword {count} + 1 : {overlap_time}")
count += 1
previous_timestamp = current_timestamp
else:
logging.debug(f"Keyword timestamp overlap : {overlap_time} - current count is {count}")

return count
Loading

1 comment on commit 449d562

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py46785%38–40, 59–61, 66
   insert_existing_data_example.py20385%25–27
postgres/schemas
   models.py711579%74–81, 91–92, 101–111
quotaclimat/data_analytics
   analytics_signataire_charte.py29290%1–67
   bilan.py1081080%2–372
   data_coverage.py34340%1–94
   exploration.py1251250%1–440
   sitemap_analytics.py1181180%1–343
quotaclimat/data_ingestion
   categorization_program_type.py110%1
   config_youtube.py110%1
   scaleway_db_backups.py34340%1–74
   scrap_chartejournalismeecologie_signataires.py50500%1–169
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
   scrap_tv_program.py62620%1–149
   scrap_youtube.py1141140%1–238
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py544026%18–39, 42–61, 65–76
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py17310440%32–36, 41–44, 47–77, 83–98, 103–105, 130–137, 141–144, 148–154, 165–176, 179–183, 189, 214–215, 221, 223, 226–251, 255–266
   config.py15287%7, 16
   detect_keywords.py88693%101–108
   utils.py642167%27–51, 54, 73–74
quotaclimat/data_processing/sitemap
   sitemap_processing.py412734%15–19, 23–25, 29–47, 51–58, 66–96, 101–103
quotaclimat/utils
   channels.py660%1–95
   climate_keywords.py220%3–35
   healthcheck_config.py291452%22–24, 27–38
   logger.py14379%22–24
   plotly_theme.py17170%1–56
TOTAL153096337% 

Tests Skipped Failures Errors Time
38 0 💤 0 ❌ 0 🔥 10.618s ⏱️

Please sign in to comment.