Skip to content

Commit

Permalink
perf: use modin instead of pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
polomarcus committed Feb 23, 2024
1 parent bc7cc25 commit 49d7a67
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
POSTGRES_PASSWORD: password
POSTGRES_HOST: postgres_db
POSTGRES_PORT: 5432
MODIN_ENGINE: dask #TODO test me
tty: true # colorize terminal
volumes:
- ./quotaclimat/:/app/quotaclimat/
Expand Down Expand Up @@ -142,6 +143,7 @@ services:
MEDIATREE_PASSWORD: /run/secrets/pwd_api
MEDIATREE_AUTH_URL: https://keywords.mediatree.fr/api/auth/token/
KEYWORDS_URL: https://keywords.mediatree.fr/api/subtitle/ # https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
MODIN_ENGINE: dask #TODO test me
volumes:
- ./quotaclimat/:/app/quotaclimat/
- ./postgres/:/app/postgres/
Expand Down
23 changes: 22 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tenacity = "^8.2.3"
sentry-sdk = "^1.40.5"
coverage = "^7.4.2"
modin = {extras = ["all"], version = "^0.27.0"}
dask-expr = "^0.4.2"

[build-system]
requires = ["poetry-core>=1.1"]
Expand Down
5 changes: 3 additions & 2 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ async def get_and_save_api_data(exit_event):
try:
df = extract_api_sub(token, channel, type_sub, date_epoch)
if(df is not None):
save_to_pg(df, keywords_table, conn)
# must ._to_pandas() because modin to_sql is not working
save_to_pg(df._to_pandas(), keywords_table, conn)
else:
logging.info("Nothing to save to Postgresql")
except Exception as err:
Expand Down Expand Up @@ -230,7 +231,7 @@ def parse_reponse_subtitle(response_sub, channel = None) -> Optional[pd.DataFram
log_dataframe_size(new_df, channel)

logging.debug("Parsed %s" % (new_df.head(1).to_string()))
logging.info("Parsed Schema\n%s", new_df.dtypes)
logging.debug("Parsed Schema\n%s", new_df.dtypes)

return new_df
else:
Expand Down
3 changes: 3 additions & 0 deletions quotaclimat/data_processing/mediatree/detect_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import swifter
from itertools import groupby
import modin.pandas as pd
import dask

dask.config.set({'dataframe.query-planning': True})

def get_cts_in_ms_for_keywords(subtitle_duration: List[dict], keywords: List[str], theme: str) -> List[dict]:
result = []
Expand Down
1 change: 1 addition & 0 deletions test/sitemap/mediatree.json

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions test/sitemap/test_main_import_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging

from quotaclimat.data_processing.mediatree.update_pg_keywords import *

from postgres.insert_data import (clean_data,
insert_data_in_sitemap_table)

from postgres.schemas.models import create_tables, get_db_session, get_keyword, connect_to_db
from postgres.insert_data import save_to_pg
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.api_import import *

import time as t


def test_main_api_import():
create_tables()
conn = connect_to_db()
json_file_path = 'test/sitemap/mediatree.json'
with open(json_file_path, 'r') as file:
json_response = json.load(file)
start_time = t.time()
df = parse_reponse_subtitle(json_response)
df = filter_and_tag_by_theme(df)
df["id"] = add_primary_key(df)
end_time = t.time()
logging.info(f"Elapsed time for api import {end_time - start_time}")
# must df._to_pandas() because to_sql does not handle modin dataframe
save_to_pg(df._to_pandas(), keywords_table, conn)
session = get_db_session(conn)
saved_keywords = get_keywords_columns(session)
assert len(saved_keywords) == len(df)
2 changes: 1 addition & 1 deletion test/sitemap/test_mediatree.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_parse_reponse_subtitle():
expected_result['start'] = pd.to_datetime(expected_result['start'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Europe/Paris')
df = parse_reponse_subtitle(json_response)
debug_df(df)
# df = df.to_pandas()

pd.testing.assert_frame_equal(df._to_pandas().reset_index(drop=True), expected_result.reset_index(drop=True))

def test_get_includes_or_query():
Expand Down
2 changes: 1 addition & 1 deletion test/sitemap/test_update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_insert_data_in_sitemap_table():
}])
df['start'] = pd.to_datetime(df['start'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('Europe/Paris')

assert save_to_pg(df, keywords_table, conn) == 1
assert save_to_pg(df._to_pandas(), keywords_table, conn) == 1

# check the value is well existing
result_before_update = get_keyword(primary_key)
Expand Down

1 comment on commit 49d7a67

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py46785%38–40, 59–61, 66
   insert_existing_data_example.py20385%25–27
postgres/schemas
   models.py711579%74–81, 91–92, 101–111
quotaclimat/data_analytics
   analytics_signataire_charte.py29290%1–67
   bilan.py1081080%2–372
   data_coverage.py34340%1–94
   exploration.py1251250%1–440
   sitemap_analytics.py1181180%1–343
quotaclimat/data_ingestion
   categorization_program_type.py110%1
   config_youtube.py110%1
   scaleway_db_backups.py34340%1–74
   scrap_chartejournalismeecologie_signataires.py50500%1–169
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
   scrap_tv_program.py62620%1–149
   scrap_youtube.py1141140%1–238
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py584031%30–51, 56–75, 79–90
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py17910541%46–50, 55–58, 62–65, 71, 74–100, 106–121, 126–128, 153–160, 164–167, 171–177, 188–199, 202–206, 212, 238–239, 243, 248–274, 278–289
   config.py15287%7, 16
   detect_keywords.py105397%122–124
   utils.py642167%27–51, 54, 73–74
quotaclimat/data_processing/sitemap
   sitemap_processing.py412734%15–19, 23–25, 29–47, 51–58, 66–96, 101–103
quotaclimat/utils
   channels.py660%1–95
   climate_keywords.py220%3–35
   healthcheck_config.py291452%22–24, 27–38
   logger.py14379%22–24
   plotly_theme.py17170%1–56
TOTAL156396139% 

Tests Skipped Failures Errors Time
44 0 💤 0 ❌ 0 🔥 50.291s ⏱️

Please sign in to comment.