From 850b425caa0ebe85777e08f9fb9ba71064ab640d Mon Sep 17 00:00:00 2001 From: ahnazary Date: Tue, 26 Nov 2024 23:53:42 +0100 Subject: [PATCH] Fix black and isort linters --- .../dagster_university/assets/metrics.py | 43 +++++++++----- .../repository/repository_test.py | 58 +++++++++++++++++++ finance/__init__.py | 12 +++- finance/src/postgres_assets.py | 2 +- finance/src/tickers_assets.py | 18 ------ 5 files changed, 95 insertions(+), 38 deletions(-) create mode 100644 dagster_university/dagster_university/repository/repository_test.py diff --git a/dagster_university/dagster_university/assets/metrics.py b/dagster_university/dagster_university/assets/metrics.py index 26f432e..55cf04d 100644 --- a/dagster_university/dagster_university/assets/metrics.py +++ b/dagster_university/dagster_university/assets/metrics.py @@ -5,6 +5,7 @@ import plotly.express as px import plotly.io as pio import requests + from dagster import asset from . import constants @@ -23,6 +24,7 @@ def taxi_trips_file(): ) as output_file: output_file.write(raw_trips.content) + @asset(deps=["taxi_trips_file"]) def taxi_trips(): """ @@ -49,7 +51,6 @@ def taxi_trips(): conn.execute(sql_query) - @asset(deps=["taxi_zones_file"]) def taxi_zones(): sql_query = f""" @@ -125,14 +126,15 @@ def manhattan_map(): pio.write_image(fig, constants.MANHATTAN_MAP_FILE_PATH) + from datetime import datetime, timedelta -from . import constants import pandas as pd -@asset( - deps=["taxi_trips"] -) +from . import constants + + +@asset(deps=["taxi_trips"]) def trips_by_week(): conn = duckdb.connect(os.getenv("DUCKDB_DATABASE")) @@ -152,12 +154,19 @@ def trips_by_week(): data_for_week = conn.execute(query).fetch_df() - aggregate = data_for_week.agg({ - "vendor_id": "count", - "total_amount": "sum", - "trip_distance": "sum", - "passenger_count": "sum" - }).rename({"vendor_id": "num_trips"}).to_frame().T # type: ignore + aggregate = ( + data_for_week.agg( + { + "vendor_id": "count", + "total_amount": "sum", + "trip_distance": "sum", + "passenger_count": "sum", + } + ) + .rename({"vendor_id": "num_trips"}) + .to_frame() + .T + ) # type: ignore aggregate["period"] = current_date @@ -166,11 +175,13 @@ def trips_by_week(): current_date += timedelta(days=7) # clean up the formatting of the dataframe - result['num_trips'] = result['num_trips'].astype(int) - result['passenger_count'] = result['passenger_count'].astype(int) - result['total_amount'] = result['total_amount'].round(2).astype(float) - result['trip_distance'] = result['trip_distance'].round(2).astype(float) - result = result[["period", "num_trips", "total_amount", "trip_distance", "passenger_count"]] + result["num_trips"] = result["num_trips"].astype(int) + result["passenger_count"] = result["passenger_count"].astype(int) + result["total_amount"] = result["total_amount"].round(2).astype(float) + result["trip_distance"] = result["trip_distance"].round(2).astype(float) + result = result[ + ["period", "num_trips", "total_amount", "trip_distance", "passenger_count"] + ] result = result.sort_values(by="period") result.to_csv(constants.TRIPS_BY_WEEK_FILE_PATH, index=False) diff --git a/dagster_university/dagster_university/repository/repository_test.py b/dagster_university/dagster_university/repository/repository_test.py new file mode 100644 index 0000000..6f3a1c7 --- /dev/null +++ b/dagster_university/dagster_university/repository/repository_test.py @@ -0,0 +1,58 @@ +from dagster import RunRequest, ScheduleDefinition, asset, job, op, repository, sensor + + +@asset +def asset1(): + pass + + +@asset +def asset2(): + pass + + +@asset(group_name="mygroup") +def asset3(): + pass + + +@op +def hello(): + pass + + +@job +def job1(): + hello() + + +@job +def job2(): + hello() + + +@job +def job3(): + hello() + + +job1_schedule = ScheduleDefinition(job=job1, cron_schedule="0 0 * * *") + + +@sensor(job=job2) +def job2_sensor(): + should_run = True + if should_run: + yield RunRequest(run_key=None, run_config={}) + + +@repository +def my_repository(): + return [ + asset1, + asset2, + asset3, + job1_schedule, + job2_sensor, + job3, + ] diff --git a/finance/__init__.py b/finance/__init__.py index 95aa781..ac73c49 100644 --- a/finance/__init__.py +++ b/finance/__init__.py @@ -1,8 +1,14 @@ # fmt: off -from dagster import Definitions, load_assets_from_modules +from dagster import ( + AssetIn, + Definitions, + asset, + in_process_executor, + load_assets_from_modules, + mem_io_manager, +) from .src import postgres_assets, tickers_assets -from dagster import AssetIn, Definitions, asset, in_process_executor, mem_io_manager _postgres_assets = load_assets_from_modules([postgres_assets]) _tickers_assets = load_assets_from_modules([tickers_assets]) @@ -13,4 +19,4 @@ "io_manager": mem_io_manager, }, executor=in_process_executor, -) \ No newline at end of file +) diff --git a/finance/src/postgres_assets.py b/finance/src/postgres_assets.py index 3fcb663..6d1713f 100644 --- a/finance/src/postgres_assets.py +++ b/finance/src/postgres_assets.py @@ -17,7 +17,7 @@ mem_io_manager, multi_asset, ) -from finance.src.utils import custom_logger +from finance.utils import custom_logger logger = custom_logger(__name__) diff --git a/finance/src/tickers_assets.py b/finance/src/tickers_assets.py index be30d43..2d33cd7 100644 --- a/finance/src/tickers_assets.py +++ b/finance/src/tickers_assets.py @@ -87,21 +87,3 @@ def tickers_to_query_from_yahoo() -> list: result = conn.execute(query).fetchmany(self.batch_size) return [result[0] for result in result] - - -defs = Definitions( - assets=[ - neon_postgres_engine, - postgres_schema, - balance_sheet_table, - cashflow_table, - income_stmt_table, - financials_table, - tickers_list_table, - valid_tickers_table, - ], - resources={ - "io_manager": mem_io_manager, - }, - executor=in_process_executor, -)