Skip to content

Commit

Permalink
fix: batch update (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
polomarcus authored Mar 3, 2024
1 parent 2c38c7e commit dda12f9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
4 changes: 3 additions & 1 deletion quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import modin.pandas as pd
from modin.pandas import json_normalize
from quotaclimat.utils.sentry import sentry_init

logging.getLogger('modin.logger.default').setLevel(logging.ERROR)
logging.getLogger('distributed.scheduler').setLevel(logging.ERROR)
logging.getLogger('distributed').setLevel(logging.ERROR)
sentry_init()

#read whole file to a string
Expand Down
2 changes: 2 additions & 0 deletions quotaclimat/data_processing/mediatree/detect_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import modin.pandas as pd
import dask
from quotaclimat.utils.logger import getLogger
logging.getLogger('modin.logger.default').setLevel(logging.ERROR)
logging.getLogger('distributed.scheduler').setLevel(logging.ERROR)
dask.config.set({'dataframe.query-planning': True})

def get_cts_in_ms_for_keywords(subtitle_duration: List[dict], keywords: List[str], theme: str) -> List[dict]:
Expand Down
23 changes: 16 additions & 7 deletions quotaclimat/data_processing/mediatree/update_pg_keywords.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
### Library imports
import requests
import modin.pandas as pd
import json

import logging

from sqlalchemy.orm import Session
from postgres.schemas.models import Keywords
from quotaclimat.data_processing.mediatree.detect_keywords import *
from sqlalchemy import func, select

def update_keywords(session: Session, batch_size: int = 50000) -> list:
saved_keywords = get_keywords_columns(session)
total_updates = len(saved_keywords)
logging.info(f"Updating {total_updates} saved keywords - keywords_with_timestamp and number_of_keywords columns")
total_updates = get_total_count_saved_keywords(session)
logging.info(f"Updating {total_updates} saved keywords")

for i in range(0, total_updates, batch_size):
batch_updates = saved_keywords[i:i+batch_size]
for keyword_id, plaintext, keywords_with_timestamp, number_of_keywords, start, srt, theme in batch_updates:
current_batch_saved_keywords = get_keywords_columns(session, i, batch_size)
logging.info(f"Updating {len(current_batch_saved_keywords)} elements from {i} offsets - batch size {batch_size}")
for keyword_id, plaintext, keywords_with_timestamp, number_of_keywords, start, srt, theme in current_batch_saved_keywords:
matching_themes, new_keywords_with_timestamp, new_number_of_keywords = get_themes_keywords_duration(plaintext, srt, start)

if(number_of_keywords != new_number_of_keywords or keywords_with_timestamp != new_keywords_with_timestamp or theme != matching_themes):
Expand All @@ -32,7 +34,7 @@ def update_keywords(session: Session, batch_size: int = 50000) -> list:
logging.info("updated all keywords")


def get_keywords_columns(session: Session) -> list:
def get_keywords_columns(session: Session, page: int = 0, batch_size: int = 50000) -> list:
return (
session.query(
Keywords.id,
Expand All @@ -43,9 +45,16 @@ def get_keywords_columns(session: Session) -> list:
Keywords.srt,
Keywords.theme
)
.offset(page)
.limit(batch_size)
.all()
)

def get_total_count_saved_keywords(session: Session) -> int:
statement = select(func.count()).select_from(Keywords)
return session.execute(statement).scalar()


def update_keyword_row(session: Session, keyword_id: int, new_number_of_keywords: int, new_keywords_with_timestamp: List[dict], matching_themes: List[str]):
session.query(Keywords).filter(Keywords.id == keyword_id).update(
{
Expand Down
5 changes: 3 additions & 2 deletions test/sitemap/test_update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def test_insert_data_in_sitemap_table():
# check the value is well existing
result_before_update = get_keyword(primary_key)
session = get_db_session(conn)
update_keywords(session)
update_keywords(session, batch_size=50)
result_after_update = get_keyword(primary_key)

new_theme, new_keywords_with_timestamp, new_value = get_themes_keywords_duration(plaintext, srt, start)
Expand Down Expand Up @@ -153,4 +153,5 @@ def test_insert_data_in_sitemap_table():
assert result_after_update.keywords_with_timestamp == new_keywords_with_timestamp
assert expected_keywords_with_timestamp == new_keywords_with_timestamp
# theme
assert result_after_update.theme == ["changement_climatique_constat", "adaptation_climatique_solutions_indirectes"]
assert result_after_update.theme == ["changement_climatique_constat", "adaptation_climatique_solutions_indirectes"]
assert new_theme == ["changement_climatique_constat", "adaptation_climatique_solutions_indirectes"]

1 comment on commit dda12f9

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py44784%37–39, 58–60, 65
   insert_existing_data_example.py20385%25–27
postgres/schemas
   models.py721579%76–83, 93–94, 103–113
quotaclimat/data_analytics
   analytics_signataire_charte.py29290%1–67
   bilan.py1081080%2–372
   data_coverage.py34340%1–94
   exploration.py1251250%1–440
   sitemap_analytics.py1181180%1–343
quotaclimat/data_ingestion
   categorization_program_type.py110%1
   config_youtube.py110%1
   scaleway_db_backups.py34340%1–74
   scrap_chartejournalismeecologie_signataires.py50500%1–169
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
   scrap_tv_program.py62620%1–149
   scrap_youtube.py1141140%1–238
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py594131%21–42, 45–65, 69–80
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py18010343%40–44, 49–55, 59–62, 68, 71–98, 104–119, 124–126, 151–163, 167–170, 174–180, 191–202, 205–209, 215, 239–240, 244, 248–267, 270–272
   config.py15287%7, 16
   detect_keywords.py157398%182–184
   utils.py662267%19, 30–54, 57, 76–77
quotaclimat/data_processing/sitemap
   sitemap_processing.py412734%15–19, 23–25, 29–47, 51–58, 66–96, 101–103
quotaclimat/utils
   channels.py660%1–95
   climate_keywords.py220%3–35
   healthcheck_config.py291452%22–24, 27–38
   logger.py241154%22–24, 28–37
   plotly_theme.py17170%1–56
   sentry.py10280%21–22
TOTAL164297141% 

Tests Skipped Failures Errors Time
64 0 💤 0 ❌ 0 🔥 1m 9s ⏱️

Please sign in to comment.