Skip to content

Commit

Permalink
feat: update - use number_of_batch - add channel_title if missing (#184)
Browse files Browse the repository at this point in the history
* feat: update - use number_of_batch - add channel_title if missing

* fix;: test

* fix: test

* fix: test
  • Loading branch information
polomarcus authored Jun 18, 2024
1 parent 7f6b5df commit 2caa6d7
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 320 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ After having updated `UPDATE` env variable to true inside docker-compose.yml and
update_pg_keywords.py:20 | Difference old 1000 - new_number_of_keywords 0
```

We can adjust batch update with these env variables (as in the docker-compose.yml):
```
BATCH_SIZE: 50000 # number of records to update in one batch
NUMBER_OF_BATCH: 4 # number of batch size to process
```

### Batch program data
`UPDATE_PROGRAM_ONLY` to true will only update program metadata, otherwise, it will update program metadata and all theme/keywords calculations.

Expand Down
8 changes: 5 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ services:
#entrypoint: ["python", "quotaclimat/data_processing/mediatree/api_import.py"]
environment:
ENV: docker # change me to prod for real cases
LOGLEVEL: DEBUG # Change me to info (debug, info, warning, error) to have less log
LOGLEVEL: INFO # Change me to info (debug, info, warning, error) to have less log
PYTHONPATH: /app
POSTGRES_USER: user
POSTGRES_DB: barometre
Expand All @@ -114,9 +114,11 @@ services:
PORT_HS: 5050 # healthcheck
HEALTHCHECK_SERVER: "0.0.0.0"
# SENTRY_DSN: prod_only
#UPDATE: "true" # to batch update PG
# UPDATE: "true" # to batch update PG
#UPDATE_PROGRAM_ONLY: "true" # to batch update PG but only channel with program
# START_OFFSET: 100 # to batch update PG from a offset
START_OFFSET: 1 # to batch update PG from a offset
#BATCH_SIZE: 50000 # number of records to update in one batch
#NUMBER_OF_BATCH: 4 # number of batch size to process
# START_DATE: 1717227223 # to test batch import
CHANNEL : france2 # to reimport only one channel
MEDIATREE_USER : /run/secrets/username_api
Expand Down
599 changes: 300 additions & 299 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ swifter = "^1.4.0"
tenacity = "^8.2.3"
sentry-sdk = "^1.44.1"
coverage = "^7.4.2"
modin = {extras = ["ray"], version = "^0.30.0"}
modin = {extras = ["ray"], version = "^0.30.1"}

[build-system]
requires = ["poetry-core>=1.1"]
Expand Down
20 changes: 13 additions & 7 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ def refresh_token(token, date):
async def update_pg_data(exit_event):
start_offset = int(os.environ.get("START_OFFSET", 0))
batch_size = int(os.environ.get("BATCH_SIZE", 50000))
number_of_batch = int(os.environ.get("NUMBER_OF_BATCH", 6))
program_only = os.environ.get("UPDATE_PROGRAM_ONLY", "false") == "true"
if(program_only):
logging.warning("Update : Program only mode activated")

logging.warning(f"Updating already saved data from Postgresql from offset {start_offset} - env variable START_OFFSET")
#TODO get program here

logging.warning(f"Updating already saved data from Postgresql from offset {start_offset} - env variable START_OFFSET until {start_offset + number_of_batch * batch_size}")
try:
session = get_db_session()
update_keywords(session, batch_size=batch_size, start_offset=start_offset, program_only=program_only)
update_keywords(session, batch_size=batch_size, start_offset=start_offset, program_only=program_only, number_of_batch=number_of_batch)
exit_event.set()
except Exception as err:
logging.error("Could update_pg_data %s:(%s) %s" % (type(err).__name__, err))
logging.error("Could update_pg_data %s:(%s)" % (type(err).__name__, err))

def get_channels():
if(os.environ.get("ENV") == "docker" or os.environ.get("CHANNEL") is not None):
Expand All @@ -80,10 +83,7 @@ async def get_and_save_api_data(exit_event):
with sentry_sdk.start_transaction(op="task", name="get_and_save_api_data"):
try:
logging.warning(f"Available CPUS {os.cpu_count()} - MODIN_CPUS config : {os.environ.get('MODIN_CPUS', 0)}")
context = ray.init(
dashboard_host="0.0.0.0", # for docker dashboard
)
logging.info(f"ray context dahsboard : {context.dashboard_url}")

conn = connect_to_db()
token=get_auth_token(password=password, user_name=USER)
type_sub = 's2t'
Expand Down Expand Up @@ -269,6 +269,12 @@ async def main():
event_finish = asyncio.Event()
# Start the health check server in the background
health_check_task = asyncio.create_task(run_health_check_server())

context = ray.init(
dashboard_host="0.0.0.0", # for docker dashboard
)
logging.info(f"Ray context dahsboard available at : {context.dashboard_url}")
logging.warning(f"Ray Information about the env: {ray.available_resources()}")

# Start batch job
if(os.environ.get("UPDATE") == "true"):
Expand Down
46 changes: 45 additions & 1 deletion quotaclimat/data_processing/mediatree/channel_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,48 @@ def get_programs_for_this_day(day: datetime, channel_name: str, df_program: pd.D
'end': get_epoch_from_datetime(row['end'].tz_localize("Europe/Paris"))
}), axis=1)
logging.info(f"Program of {channel_name} : {programs_of_a_day}")
return programs_of_a_day
return programs_of_a_day

def get_channel_title_for_name(channel_name: str) -> str:
match channel_name:
case "tf1":
return "TF1"
case "france2":
return "France 2"
case "fr3-idf":
return "France 3-idf"
case "m6":
return "M6"
case "arte":
return "Arte"
case "d8":
return "C8"
case "bfmtv":
return "BFM TV"
case "lci":
return "LCI"
case "franceinfotv":
return "France Info"
case "itele":
return "CNews"
case "europe1":
return "Europe 1"
case "france-culture":
return "France Culture"
case "france-inter":
return "France Inter"
case "sud-radio":
return "Sud Radio"
case "rmc":
return "RMC"
case "rtl":
return "RTL"
case "france24":
return "France 24"
case "france-info":
return "FranceinfoRadio"
case "rfi":
return "RFI"
case _:
logging.error(f"Channel_name unknown {channel_name}")
return ""
30 changes: 21 additions & 9 deletions quotaclimat/data_processing/mediatree/update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@
from sqlalchemy.orm import Session
from postgres.schemas.models import Keywords
from quotaclimat.data_processing.mediatree.detect_keywords import *
from quotaclimat.data_processing.mediatree.channel_program import get_programs, get_a_program_with_start_timestamp
from quotaclimat.data_processing.mediatree.channel_program import get_programs, get_a_program_with_start_timestamp, get_channel_title_for_name
from sqlalchemy import func, select, delete

def update_keywords(session: Session, batch_size: int = 50000, start_offset : int = 0, program_only=False) -> list:
def update_keywords(session: Session, batch_size: int = 50000, start_offset : int = 0, program_only=False, number_of_batch: int = 4) -> list:
total_updates = get_total_count_saved_keywords(session)
logging.info(f"Updating {total_updates} saved keywords from {start_offset} offsets - batch size {batch_size}")
until_offset = start_offset + (number_of_batch * batch_size)
if(until_offset > total_updates):
logging.info(f"Until offset ({until_offset}) too high max ={total_updates}, using max instead - change number_of_batch env variable if needed")
until_offset = total_updates

logging.info(f"Updating {total_updates} saved keywords from {start_offset} offsets - batch size {batch_size} - until offset {until_offset}")
df_programs = get_programs()

for i in range(start_offset, total_updates, batch_size):
for i in range(start_offset, until_offset, batch_size):
current_batch_saved_keywords = get_keywords_columns(session, i, batch_size)
logging.info(f"Updating {len(current_batch_saved_keywords)} elements from {i} offsets - batch size {batch_size}")
for keyword_id, plaintext, keywords_with_timestamp, number_of_keywords, start, srt, theme, channel_name in current_batch_saved_keywords:
logging.info(f"Updating {len(current_batch_saved_keywords)} elements from {i} offsets - batch size {batch_size} - until offset {until_offset}")
for keyword_id, plaintext, keywords_with_timestamp, number_of_keywords, start, srt, theme, channel_name, channel_title in current_batch_saved_keywords:
program_name, program_name_type = get_a_program_with_start_timestamp(df_programs, pd.Timestamp(start).tz_convert('Europe/Paris'), channel_name)

if channel_title is None:
logging.debug("channel_title none, set it using channel_name")
channel_title = get_channel_title_for_name(channel_name)

if(not program_only):
try:
matching_themes, \
Expand Down Expand Up @@ -71,14 +79,15 @@ def update_keywords(session: Session, batch_size: int = 50000, start_offset : in
,number_of_biodiversite_solutions_directes
,channel_program=program_name
,channel_program_type=program_name_type
,channel_title=channel_title
)
else:
update_keyword_row_program(session
,keyword_id
,channel_program=program_name
,channel_program_type=program_name_type
)
logging.info(f"bulk update done {i} out of {total_updates}")
logging.info(f"bulk update done {i} out of {until_offset} - (max offset {total_updates})")
session.commit()

logging.info("updated all keywords")
Expand All @@ -95,6 +104,7 @@ def get_keywords_columns(session: Session, page: int = 0, batch_size: int = 5000
Keywords.srt,
Keywords.theme,
Keywords.channel_name,
Keywords.channel_title,
)
.offset(page)
.limit(batch_size)
Expand Down Expand Up @@ -123,6 +133,7 @@ def update_keyword_row(session: Session,
number_of_biodiversite_solutions_directes: int,
channel_program: str,
channel_program_type: str,
channel_title: str,
):
if matching_themes is not None:
session.query(Keywords).filter(Keywords.id == keyword_id).update(
Expand All @@ -142,7 +153,8 @@ def update_keyword_row(session: Session,
Keywords.number_of_biodiversite_consequences:number_of_biodiversite_consequences ,
Keywords.number_of_biodiversite_solutions_directes:number_of_biodiversite_solutions_directes,
Keywords.channel_program: channel_program,
Keywords.channel_program_type: channel_program_type
Keywords.channel_program_type: channel_program_type,
Keywords.channel_title: channel_title
},
synchronize_session=False
)
Expand Down
6 changes: 6 additions & 0 deletions test/sitemap/test_update_pg_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_delete_keywords():
"number_of_biodiversite_solutions_directes" : wrong_value,
"channel_program_type": "to change",
"channel_program":"to change"
,"channel_title":"channel_title"
}])
assert save_to_pg(df, keywords_table, conn) == 1
session = get_db_session(conn)
Expand All @@ -65,6 +66,7 @@ def test_delete_keywords():
,0
,"télématin"
,"Information - Magazine"
,"M6"
)
assert get_keyword(primary_key) == None

Expand Down Expand Up @@ -173,6 +175,7 @@ def test_first_update_keywords():
"number_of_biodiversite_solutions_directes" : wrong_value,
"channel_program_type": "to change",
"channel_program":"to change"
,"channel_title":None
}])

assert save_to_pg(df, keywords_table, conn) == 1
Expand Down Expand Up @@ -242,3 +245,6 @@ def test_first_update_keywords():
assert result_after_update.channel_program == "1245 le mag"
assert result_after_update.channel_program_type == "Information - Magazine"

#channel_title
assert result_after_update.channel_title == "M6"

1 comment on commit 2caa6d7

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py44784%36–38, 57–59, 64
   insert_existing_data_example.py19384%25–27
postgres/schemas
   models.py1431093%117–124, 136–137, 195–196, 210–211
quotaclimat/data_ingestion
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py553733%21–42, 45–58, 62–73
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py20012338%43–47, 52–67, 71–74, 80, 83–122, 128–143, 147–148, 161–173, 177–183, 196–207, 210–214, 220, 255–256, 260, 264–293, 296–298
   channel_program.py1345162%21–23, 34–36, 50, 86, 95, 133–174
   config.py15287%7, 16
   detect_keywords.py180498%178, 230–232
   update_pg_keywords.py513727%14–93, 115–116, 138–164, 170
   utils.py662365%18, 29–53, 56, 65, 81–82
quotaclimat/utils
   healthcheck_config.py291452%22–24, 27–38
   logger.py241154%22–24, 28–37
   sentry.py10280%21–22
TOTAL116634470% 

Tests Skipped Failures Errors Time
80 0 💤 0 ❌ 0 🔥 58.080s ⏱️

Please sign in to comment.