Skip to content

Commit

Permalink
Fix black and isort linters
Browse files Browse the repository at this point in the history
  • Loading branch information
ahnazary committed Nov 26, 2024
1 parent 449024a commit 850b425
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 38 deletions.
43 changes: 27 additions & 16 deletions dagster_university/dagster_university/assets/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import plotly.express as px
import plotly.io as pio
import requests

from dagster import asset

from . import constants
Expand All @@ -23,6 +24,7 @@ def taxi_trips_file():
) as output_file:
output_file.write(raw_trips.content)


@asset(deps=["taxi_trips_file"])
def taxi_trips():
"""
Expand All @@ -49,7 +51,6 @@ def taxi_trips():
conn.execute(sql_query)



@asset(deps=["taxi_zones_file"])
def taxi_zones():
sql_query = f"""
Expand Down Expand Up @@ -125,14 +126,15 @@ def manhattan_map():

pio.write_image(fig, constants.MANHATTAN_MAP_FILE_PATH)


from datetime import datetime, timedelta
from . import constants

import pandas as pd

@asset(
deps=["taxi_trips"]
)
from . import constants


@asset(deps=["taxi_trips"])
def trips_by_week():
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))

Expand All @@ -152,12 +154,19 @@ def trips_by_week():

data_for_week = conn.execute(query).fetch_df()

aggregate = data_for_week.agg({
"vendor_id": "count",
"total_amount": "sum",
"trip_distance": "sum",
"passenger_count": "sum"
}).rename({"vendor_id": "num_trips"}).to_frame().T # type: ignore
aggregate = (
data_for_week.agg(
{
"vendor_id": "count",
"total_amount": "sum",
"trip_distance": "sum",
"passenger_count": "sum",
}
)
.rename({"vendor_id": "num_trips"})
.to_frame()
.T
) # type: ignore

aggregate["period"] = current_date

Expand All @@ -166,11 +175,13 @@ def trips_by_week():
current_date += timedelta(days=7)

# clean up the formatting of the dataframe
result['num_trips'] = result['num_trips'].astype(int)
result['passenger_count'] = result['passenger_count'].astype(int)
result['total_amount'] = result['total_amount'].round(2).astype(float)
result['trip_distance'] = result['trip_distance'].round(2).astype(float)
result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]]
result["num_trips"] = result["num_trips"].astype(int)
result["passenger_count"] = result["passenger_count"].astype(int)
result["total_amount"] = result["total_amount"].round(2).astype(float)
result["trip_distance"] = result["trip_distance"].round(2).astype(float)
result = result[
["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]
]
result = result.sort_values(by="period")

result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dagster import RunRequest, ScheduleDefinition, asset, job, op, repository, sensor


@asset
def asset1():
pass


@asset
def asset2():
pass


@asset(group_name="mygroup")
def asset3():
pass


@op
def hello():
pass


@job
def job1():
hello()


@job
def job2():
hello()


@job
def job3():
hello()


job1_schedule = ScheduleDefinition(job=job1, cron_schedule="0 0 * * *")


@sensor(job=job2)
def job2_sensor():
should_run = True
if should_run:
yield RunRequest(run_key=None, run_config={})


@repository
def my_repository():
return [
asset1,
asset2,
asset3,
job1_schedule,
job2_sensor,
job3,
]
12 changes: 9 additions & 3 deletions finance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# fmt: off
from dagster import Definitions, load_assets_from_modules
from dagster import (
AssetIn,
Definitions,
asset,
in_process_executor,
load_assets_from_modules,
mem_io_manager,
)

from .src import postgres_assets, tickers_assets
from dagster import AssetIn, Definitions, asset, in_process_executor, mem_io_manager

_postgres_assets = load_assets_from_modules([postgres_assets])
_tickers_assets = load_assets_from_modules([tickers_assets])
Expand All @@ -13,4 +19,4 @@
"io_manager": mem_io_manager,
},
executor=in_process_executor,
)
)
2 changes: 1 addition & 1 deletion finance/src/postgres_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mem_io_manager,
multi_asset,
)
from finance.src.utils import custom_logger
from finance.utils import custom_logger

logger = custom_logger(__name__)

Expand Down
18 changes: 0 additions & 18 deletions finance/src/tickers_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,3 @@ def tickers_to_query_from_yahoo() -> list:
result = conn.execute(query).fetchmany(self.batch_size)

return [result[0] for result in result]


defs = Definitions(
assets=[
neon_postgres_engine,
postgres_schema,
balance_sheet_table,
cashflow_table,
income_stmt_table,
financials_table,
tickers_list_table,
valid_tickers_table,
],
resources={
"io_manager": mem_io_manager,
},
executor=in_process_executor,
)

0 comments on commit 850b425

Please sign in to comment.