diff --git a/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py new file mode 100644 index 00000000..10452d34 --- /dev/null +++ b/backend/alembic/versions/dbe1b900df12_clean_position_on_updated_vessels_only.py @@ -0,0 +1,29 @@ +"""Clean position on updated vessels only + +Revision ID: dbe1b900df12 +Revises: 5bfe00a08853 +Create Date: 2024-12-26 13:10:19.501996 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'dbe1b900df12' +down_revision = '5bfe00a08853' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_index("spire_ais_data_spire_update_statement_idx", + "spire_ais_data", + ["spire_update_statement"]) + pass + + +def downgrade() -> None: + op.drop_index("spire_ais_data_spire_update_statement_idx", + "spire_ais_data") + pass diff --git a/backend/bloom/infra/repositories/repository_spire_ais_data.py b/backend/bloom/infra/repositories/repository_spire_ais_data.py index 696ed98d..63c2092b 100644 --- a/backend/bloom/infra/repositories/repository_spire_ais_data.py +++ b/backend/bloom/infra/repositories/repository_spire_ais_data.py @@ -109,10 +109,10 @@ def get_all_data_between_date( ).where( and_( sql_model.Vessel.tracking_activated == True, - sql_model.SpireAisData.created_at > created_updated_after, - sql_model.SpireAisData.created_at <= created_updated_before + sql_model.SpireAisData.spire_update_statement > created_updated_after, + sql_model.SpireAisData.spire_update_statement <= created_updated_before ) - ).order_by(sql_model.SpireAisData.created_at.asc()) + ).order_by(sql_model.SpireAisData.spire_update_statement.asc()) result = session.execute(stmt) return pd.DataFrame(result, columns=[ "id", diff --git a/backend/bloom/infra/repositories/repository_vessel_position.py b/backend/bloom/infra/repositories/repository_vessel_position.py index 65e340ac..656de736 100644 --- a/backend/bloom/infra/repositories/repository_vessel_position.py +++ b/backend/bloom/infra/repositories/repository_vessel_position.py @@ -53,8 +53,8 @@ def get_vessel_positions(self, session: Session, vessel_id:int, else: return [] - def get_positions_with_vessel_created_updated_after(self, session: Session, - created_updated_after: datetime) -> pd.DataFrame: + def get_positions_with_vessel_updated_after(self, session: Session, + updated_after: datetime) -> pd.DataFrame: stmt = select(sql_model.VesselPosition.id, sql_model.VesselPosition.timestamp, sql_model.VesselPosition.accuracy, sql_model.VesselPosition.collection_type, sql_model.VesselPosition.course, sql_model.VesselPosition.heading, @@ -62,7 +62,7 @@ def get_positions_with_vessel_created_updated_after(self, session: Session, sql_model.VesselPosition.latitude, sql_model.VesselPosition.rot, sql_model.VesselPosition.speed, sql_model.VesselPosition.created_at, sql_model.Vessel.id, sql_model.Vessel.mmsi).where( - sql_model.VesselPosition.created_at > created_updated_after + sql_model.VesselPosition.timestamp > updated_after ).join(sql_model.Vessel, sql_model.VesselPosition.vessel_id == sql_model.Vessel.id).order_by( sql_model.VesselPosition.created_at.asc()) diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index 4e801f87..4367a49c 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -27,7 +27,7 @@ def get_distance(current_position: tuple, last_position: tuple): def map_vessel_position_to_domain(row: pd.Series) -> VesselPosition: return VesselPosition( vessel_id=row["vessel_id"], - timestamp=row["position_timestamp"], + timestamp=row["spire_update_statement"], accuracy=row["position_accuracy"], collection_type=row["position_collection_type"], course=row["position_course"], @@ -66,13 +66,13 @@ def run(batch_time): ) batch_limit = point_in_time + timedelta(days=batch_time) # Step 1: load SPIRE batch: read from SpireAisData - logger.info(f"Lecture des nouvelles positions de Spire en base") + logger.info(f"Lecture des nouvelles positions de Spire en base between {point_in_time} and {batch_limit}") batch = spire_repository.get_all_data_between_date(session, point_in_time, batch_limit) # Recherche de la date de l'enregistrement traité le plus récent. # Cette date est stockée comme date d'exécution du traitement ce qui permettra de repartir de cette date # à la prochaine execution pour traiter les enregistrements + récents - max_created = max(batch["created_at"]) if len(batch) > 0 else batch_limit + max_created = max(batch["spire_update_statement"]) if len(batch) > 0 else batch_limit logger.info(f"Traitement des positions entre le {point_in_time} et le {max_created}") position_count=len(batch) logger.info(f"{position_count} nouvelles positions de Spire") @@ -164,6 +164,7 @@ def run(batch_time): # Step 9: insert to DataBase clean_positions = batch.apply(map_vessel_position_to_domain, axis=1).values.tolist() + #print(f"Batch final:\n{clean_positions}") vessel_position_repository.batch_create_vessel_position(session, clean_positions) TaskExecutionRepository.set_point_in_time(session, "clean_positions", max_created) logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions") diff --git a/backend/bloom/tasks/create_update_excursions_segments.py b/backend/bloom/tasks/create_update_excursions_segments.py index 943384b9..bb0b4f51 100644 --- a/backend/bloom/tasks/create_update_excursions_segments.py +++ b/backend/bloom/tasks/create_update_excursions_segments.py @@ -115,7 +115,7 @@ def run(): session, "create_update_excursions_segments", ) logger.info(f"Lecture des nouvelles positions depuis le {point_in_time}") - batch = vessel_position_repository.get_positions_with_vessel_created_updated_after(session, point_in_time) + batch = vessel_position_repository.get_positions_with_vessel_updated_after(session, point_in_time) position_count=len(batch) logger.info(f"{position_count} nouvelles positions") last_segment = segment_repository.get_last_vessel_id_segments(session)