Skip to content

Commit

Permalink
Merge v0.7 into production (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsarrco authored Nov 9, 2023
2 parents c75b1a6 + 40c5b43 commit 1097835
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 113 deletions.
4 changes: 3 additions & 1 deletion alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from sqlalchemy import pool

from server.base.models import Base
from server.sources import engine_url
from config import config as engine_config

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

engine_url = f"postgresql://{engine_config['PGUSER']}:{engine_config['PGPASSWORD']}@{engine_config['PGHOST']}:{engine_config['PGPORT']}/" \
f"{engine_config['PGDATABASE']}"
config.set_main_option('sqlalchemy.url', engine_url)

# Interpret the config file for Python logging.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Add active fields to stations and stops
Revision ID: 3e63dbd74ceb
Revises: 1f2c7b1eec8b
Create Date: 2023-11-09 16:37:09.843639
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '3e63dbd74ceb'
down_revision = '1f2c7b1eec8b'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column('stations', sa.Column('active', sa.Boolean(), server_default='true', nullable=False))
op.add_column('stops', sa.Column('active', sa.Boolean(), server_default='true', nullable=False))


def downgrade() -> None:
op.drop_column('stops', 'active')
op.drop_column('stations', 'active')
80 changes: 80 additions & 0 deletions alembic/versions/7c12f6bfe3c6_merge_trip_into_stoptime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Merge Trip into StopTime
Revision ID: 7c12f6bfe3c6
Revises: 1f2c7b1eec8b
Create Date: 2023-11-05 09:16:46.640362
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '7c12f6bfe3c6'
down_revision = '3e63dbd74ceb'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.drop_constraint('stop_times_trip_id_fkey', 'stop_times', type_='foreignkey')

# Create new fields as nullable true temporarily
op.add_column('stop_times', sa.Column('orig_id', sa.String(), nullable=True))
op.add_column('stop_times', sa.Column('dest_text', sa.String(), nullable=True))
op.add_column('stop_times', sa.Column('number', sa.Integer(), nullable=True))
op.add_column('stop_times', sa.Column('orig_dep_date', sa.Date(), nullable=True))
op.add_column('stop_times', sa.Column('route_name', sa.String(), nullable=True))
op.add_column('stop_times', sa.Column('source', sa.String(), server_default='treni', nullable=True))

# populate new fields with data from trips through stop_times.trip_id
op.execute('''
UPDATE stop_times
SET
orig_id = trips.orig_id,
dest_text = trips.dest_text,
number = trips.number,
orig_dep_date = trips.orig_dep_date,
route_name = trips.route_name,
source = trips.source
FROM trips
WHERE stop_times.trip_id = trips.id
''')

# convert new fields to not nullable
op.alter_column('stop_times', 'orig_id', nullable=False)
op.alter_column('stop_times', 'dest_text', nullable=False)
op.alter_column('stop_times', 'number', nullable=False)
op.alter_column('stop_times', 'orig_dep_date', nullable=False)
op.alter_column('stop_times', 'route_name', nullable=False)
op.alter_column('stop_times', 'source', nullable=False)

# drop trip_id column
op.drop_column('stop_times', 'trip_id')

# drop trips table
op.drop_table('trips')


def downgrade() -> None:
op.create_table('trips',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('orig_id', sa.String(), autoincrement=False, nullable=False),
sa.Column('dest_text', sa.String(), autoincrement=False, nullable=False),
sa.Column('number', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('orig_dep_date', sa.Date(), autoincrement=False, nullable=False),
sa.Column('route_name', sa.String(), autoincrement=False, nullable=False),
sa.Column('source', sa.String(), server_default='treni', autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('source', 'number', 'orig_dep_date',
name='trips_source_number_orig_dep_date_key')
)

op.add_column('stop_times', sa.Column('trip_id', sa.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key('stop_times_trip_id_fkey', 'stop_times', 'trips', ['trip_id'], ['id'], ondelete='CASCADE')

op.drop_column('stop_times', 'source')
op.drop_column('stop_times', 'route_name')
op.drop_column('stop_times', 'orig_dep_date')
op.drop_column('stop_times', 'number')
op.drop_column('stop_times', 'dest_text')
op.drop_column('stop_times', 'orig_id')
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Convert stop_times to partitioned table
Revision ID: d55702afa188
Revises: 1f2c7b1eec8b
Create Date: 2023-10-29 16:09:44.815425
"""
from alembic import op


# revision identifiers, used by Alembic.
revision = 'd55702afa188'
down_revision = '7c12f6bfe3c6'
branch_labels = None
depends_on = None


# Define the migration
def upgrade():
# remove foreign key "stop_times_stop_id_fkey"
op.drop_constraint('stop_times_stop_id_fkey', 'stop_times', type_='foreignkey')

# rename table "stop_times" to "stop_times_reg"
op.rename_table('stop_times', 'stop_times_reg')

# create the partitioned table "stop_times" for field "orig_dep_date"
op.execute("""
CREATE TABLE stop_times (
id SERIAL NOT NULL,
stop_id character varying NOT NULL,
sched_arr_dt timestamp without time zone,
sched_dep_dt timestamp without time zone,
platform character varying,
orig_dep_date date NOT NULL,
orig_id character varying NOT NULL,
dest_text character varying NOT NULL,
number integer NOT NULL,
route_name character varying NOT NULL,
source character varying,
CONSTRAINT stop_times_stop_id_fkey FOREIGN key(stop_id) REFERENCES stops(id)
) PARTITION BY RANGE (orig_dep_date);
CREATE UNIQUE INDEX stop_times_unique_idx ON stop_times(stop_id, number, source, orig_dep_date);
""")


def downgrade():
# drop the partitioned table "stop_times"
op.drop_table('stop_times')

# rename table "stop_times_reg" to "stop_times"
op.rename_table('stop_times_reg', 'stop_times')

# add foreign key "stop_times_stop_id_fkey"
op.create_foreign_key('stop_times_stop_id_fkey', 'stop_times', 'stops', ['stop_id'], ['id'])
25 changes: 22 additions & 3 deletions save_data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
from datetime import date, timedelta

from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect

from server.GTFS import GTFS
from server.sources import engine
from server.base.models import StopTime
from server.sources import engine, session
from server.trenitalia import Trenitalia
from server.typesense import connect_to_typesense

Expand All @@ -20,7 +22,6 @@ def run():
args = parser.parse_args()
force_update_stations = args.force_update_stations

session = sessionmaker(bind=engine)()
typesense = connect_to_typesense()

sources = [
Expand All @@ -29,6 +30,24 @@ def run():
Trenitalia(session, typesense, force_update_stations=force_update_stations),
]

session.commit()

today = date.today()

for i in range(3):
day: date = today + timedelta(days=i)
partition = StopTime.create_partition(day)
if not inspect(engine).has_table(partition.__table__.name):
partition.__table__.create(bind=engine)

while True:
i = -2
day = today + timedelta(days=i)
try:
StopTime.detach_partition(day)
except Exception:
break

for source in sources:
try:
source.save_data()
Expand Down
13 changes: 6 additions & 7 deletions server/GTFS/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sqlalchemy import select, func
from tqdm import tqdm

from server.base import Source, Station, Stop, TripStopTime
from server.base import Source, Station, Stop, TripStopTime, StopTime
from .clustering import get_clusters_of_stops, get_loc_from_stop_and_cluster
from .models import CStop

Expand Down Expand Up @@ -309,13 +309,12 @@ def get_sqlite_stop_times(self, day: date, start_time: time, end_time: time, lim

def search_lines(self, name):
today = date.today()
from server.base import Trip
trips = self.session.execute(
select(func.max(Trip.number), Trip.dest_text)\
.filter(Trip.orig_dep_date == today)\
.filter(Trip.route_name == name)\
.group_by(Trip.dest_text)\
.order_by(func.count(Trip.id).desc()))\
select(func.max(StopTime.number), StopTime.dest_text) \
.filter(StopTime.orig_dep_date == today) \
.filter(StopTime.route_name == name) \
.group_by(StopTime.dest_text) \
.order_by(func.count(StopTime.number).desc())) \
.all()

results = [(trip[0], name, trip[1]) for trip in trips]
Expand Down
Loading

0 comments on commit 1097835

Please sign in to comment.