diff --git a/migrations/versions/f5dbba621bb1_add_out_zenodo_logs.py b/migrations/versions/f5dbba621bb1_add_out_zenodo_logs.py new file mode 100644 index 0000000..c66de76 --- /dev/null +++ b/migrations/versions/f5dbba621bb1_add_out_zenodo_logs.py @@ -0,0 +1,59 @@ +"""Add out_zenodo_logs + +Revision ID: f5dbba621bb1 +Revises: b4fee22b4a4d +Create Date: 2024-10-25 13:30:27.163055 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'f5dbba621bb1' +down_revision: Union[str, None] = 'b4fee22b4a4d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('out_zenodo_logs', + sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date when the metadata was reported.'), + sa.Column('version', sa.String(), nullable=True, comment='The version (e.g. 10.0.0) of the dataset record.'), + sa.Column('dataset_slug', sa.String(), nullable=True, comment='The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.'), + sa.Column('new_dataset_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'), + sa.Column('new_dataset_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'), + sa.Column('new_dataset_views', sa.Integer(), nullable=True, comment='The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'), + sa.Column('new_dataset_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'), + sa.Column('new_version_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'), + sa.Column('new_version_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'), + sa.Column('new_version_views', sa.Integer(), nullable=True, comment='The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'), + sa.Column('new_version_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'), + sa.Column('version_title', sa.String(), nullable=True, comment='The name of the version in Zenodo.'), + sa.Column('version_id', sa.Integer(), nullable=False, comment='The unique ID of the Zenodo version. This is identical to the version DOI.'), + sa.Column('version_record_id', sa.Integer(), nullable=True, comment='The record ID of the Zenodo version. This is identical to the version ID.'), + sa.Column('concept_record_id', sa.Integer(), nullable=True, comment='The concept record ID. This is shared between all versions of a record.'), + sa.Column('version_creation_date', sa.DateTime(), nullable=True, comment='The datetime the record was created.'), + sa.Column('version_last_modified_date', sa.DateTime(), nullable=True, comment='The datetime the record was last modified.'), + sa.Column('version_last_updated_date', sa.DateTime(), nullable=True, comment='The datetime the record was last updated.'), + sa.Column('version_publication_date', sa.Date(), nullable=True, comment='The date that the version was published.'), + sa.Column('version_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo version.'), + sa.Column('concept_record_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo concept record.'), + sa.Column('version_doi_url', sa.String(), nullable=True, comment='The DOI link of the Zenodo version.'), + sa.Column('version_status', sa.String(), nullable=True, comment='The status of the Zenodo version.'), + sa.Column('version_state', sa.String(), nullable=True, comment='The state of the Zenodo version.'), + sa.Column('version_submitted', sa.Boolean(), nullable=True, comment='Is the version submitted?'), + sa.Column('version_description', sa.String(), nullable=True, comment='The description of the version.'), + sa.Column('partition_key', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('metrics_date', 'version_id') + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('out_zenodo_logs') + # ### end Alembic commands ### diff --git a/notebooks/cloud_sql_connection.ipynb b/notebooks/cloud_sql_connection.ipynb index 3e0cb8a..af06475 100644 --- a/notebooks/cloud_sql_connection.ipynb +++ b/notebooks/cloud_sql_connection.ipynb @@ -18,6 +18,14 @@ "## Using Google connection with Dagster" ] }, + { + "cell_type": "markdown", + "id": "bb2abcf6", + "metadata": {}, + "source": [ + "First connect to the existing postgres DB. You'll need to whitelist your IP if you haven't already!" + ] + }, { "cell_type": "code", "execution_count": null, @@ -28,16 +36,70 @@ "%%time\n", "import numpy as np\n", "import pandas as pd\n", + "import sqlalchemy as sa\n", "\n", - "from usage_metrics.resources.postgres import PostgresManager\n", - "\n", - "rng = np.random.default_rng()\n", - "df = pd.DataFrame(rng.integers(0, 100, size=(100, 4)), columns=list(\"ABCD\"))\n", + "from usage_metrics.resources.postgres import PostgresIOManager\n", "\n", - "engine = PostgresManager()._create_engine()\n", + "engine = PostgresIOManager().engine" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18b519db", + "metadata": {}, + "outputs": [], + "source": [ + "metadata = sa.MetaData()\n", + "metadata.reflect(engine)\n", "\n", - "with engine.connect() as con:\n", - " df.to_sql(\"new_table\", con, if_exists=\"replace\")" + "# Get metadata by table\n", + "metadata.tables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea927736", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4bd5a6d8", + "metadata": {}, + "outputs": [], + "source": [ + "# To delete data from a single table - TREAD CAREFULLY!\n", + "table_name = \"core_zenodo_logs\"\n", + "#with engine.connect() as conn:\n", + "# conn.execute(metadata.tables['core_zenodo_logs'].delete())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29c664a2", + "metadata": {}, + "outputs": [], + "source": [ + "# To delete everything! TREAD CAREFULLY!\n", + "#metadata.drop_all(engine)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a9224e8c", + "metadata": {}, + "outputs": [], + "source": [ + "# Check that this worked as expected\n", + "engine = PostgresIOManager().engine\n", + "metadata.reflect(engine)\n", + "metadata.tables" ] }, { @@ -244,7 +306,7 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "pudl-usage-metrics", "language": "python", "name": "python3" }, @@ -258,7 +320,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.4" + "version": "3.12.5" } }, "nbformat": 4, diff --git a/src/usage_metrics/core/zenodo.py b/src/usage_metrics/core/zenodo.py index fd941f0..c5d72cb 100644 --- a/src/usage_metrics/core/zenodo.py +++ b/src/usage_metrics/core/zenodo.py @@ -12,7 +12,7 @@ @asset( partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"), - # io_manager_key="database_manager", + io_manager_key="database_manager", tags={"source": "zenodo"}, ) def core_zenodo_logs( diff --git a/src/usage_metrics/etl/__init__.py b/src/usage_metrics/etl/__init__.py index d737f0d..6a6a1c1 100644 --- a/src/usage_metrics/etl/__init__.py +++ b/src/usage_metrics/etl/__init__.py @@ -44,6 +44,7 @@ out_module_groups = { "out_s3": [usage_metrics.out.s3], + "out_zenodo": [usage_metrics.out.zenodo], } non_partitioned_module_groups = { diff --git a/src/usage_metrics/models.py b/src/usage_metrics/models.py index 6357b4c..5359602 100644 --- a/src/usage_metrics/models.py +++ b/src/usage_metrics/models.py @@ -466,6 +466,144 @@ Column("partition_key", String), ) +out_zenodo_logs = Table( + "out_zenodo_logs", + usage_metrics_metadata, + Column( + "metrics_date", + Date, + primary_key=True, + comment="The date when the metadata was reported.", + ), + Column( + "version", + String, + comment="The version (e.g. 10.0.0) of the dataset record.", + ), + Column( + "dataset_slug", + String, + comment="The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.", + ), + Column( + "new_dataset_downloads", + Integer, + comment="The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.", + ), + Column( + "new_dataset_unique_downloads", + Integer, + comment="The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.", + ), + Column( + "new_dataset_views", + Integer, + comment="The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.", + ), + Column( + "new_dataset_unique_views", + Integer, + comment="The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.", + ), + Column( + "new_version_downloads", + Integer, + comment="The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.", + ), + Column( + "new_version_unique_downloads", + Integer, + comment="The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.", + ), + Column( + "new_version_views", + Integer, + comment="The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.", + ), + Column( + "new_version_unique_views", + Integer, + comment="The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.", + ), + Column( + "version_title", + String, + comment="The name of the version in Zenodo.", + ), + Column( + "version_id", + Integer, + primary_key=True, + comment="The unique ID of the Zenodo version. This is identical to the version DOI.", + ), + Column( + "version_record_id", + Integer, + comment="The record ID of the Zenodo version. This is identical to the version ID.", + ), + Column( + "concept_record_id", + Integer, + comment="The concept record ID. This is shared between all versions of a record.", + ), + Column( + "version_creation_date", + DateTime, + comment="The datetime the record was created.", + ), + Column( + "version_last_modified_date", + DateTime, + comment="The datetime the record was last modified.", + ), + Column( + "version_last_updated_date", + DateTime, + comment="The datetime the record was last updated.", + ), + Column( + "version_publication_date", + Date, + comment="The date that the version was published.", + ), + Column( + "version_doi", + String, + comment="The DOI of the Zenodo version.", + ), + Column( + "concept_record_doi", + String, + comment="The DOI of the Zenodo concept record.", + ), + Column( + "version_doi_url", + String, + comment="The DOI link of the Zenodo version.", + ), + Column( + "version_status", + String, + comment="The status of the Zenodo version.", + ), + Column( + "version_state", + String, + comment="The state of the Zenodo version.", + ), + Column( + "version_submitted", + Boolean, + comment="Is the version submitted?", + ), + Column( + "version_description", + String, + comment="The description of the version.", + ), + Column("partition_key", String), +) + intake_logs = Table( "intake_logs", usage_metrics_metadata, diff --git a/src/usage_metrics/out/__init__.py b/src/usage_metrics/out/__init__.py index cad926c..b36c8db 100644 --- a/src/usage_metrics/out/__init__.py +++ b/src/usage_metrics/out/__init__.py @@ -1,3 +1,3 @@ """Module contains assets that transform data into core assets.""" -from . import s3 +from . import s3, zenodo diff --git a/src/usage_metrics/out/zenodo.py b/src/usage_metrics/out/zenodo.py new file mode 100644 index 0000000..55d8091 --- /dev/null +++ b/src/usage_metrics/out/zenodo.py @@ -0,0 +1,61 @@ +"""Create outputs from Zenodo logs.""" + +import pandas as pd +from dagster import ( + AssetExecutionContext, + WeeklyPartitionsDefinition, + asset, +) + + +@asset( + partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"), + io_manager_key="database_manager", + tags={"source": "zenodo"}, +) +def out_zenodo_logs( + context: AssetExecutionContext, + core_zenodo_logs: pd.DataFrame, +) -> pd.DataFrame: + """Output daily Zenodo logs. + + Calculate differences from the cumulative views and downloads columns. + """ + context.log.info(f"Processing data for the week of {context.partition_key}") + + if core_zenodo_logs.empty: + return core_zenodo_logs + metrics_cols = [ + "dataset_downloads", + "dataset_unique_downloads", + "dataset_views", + "dataset_unique_views", + "version_downloads", + "version_unique_downloads", + "version_views", + "version_unique_views", + ] + + # Drop mistaken/deleted archives + df = core_zenodo_logs.loc[~core_zenodo_logs.version_id.isin([13919960, 13920120])] + + df["metrics_date"] = pd.to_datetime(df["metrics_date"]) + # First, ffill all gaps in between dates in case any daily downloads failed + df = df.set_index(["metrics_date", "version_id"]).unstack().ffill() + # Then, backfill all dates prior to the existence of the datasets with 0 for the metric columns + idx = pd.IndexSlice + df.loc(axis=1)[idx[metrics_cols, :]] = ( + df.loc(axis=1)[idx[metrics_cols, :]].fillna(0).astype(int) + ) + # Backfill the metadata + df = df.bfill() + # # Convert the cumulative sum columns to diff columns + df.loc(axis=1)[idx[metrics_cols, :]] = ( + df.loc(axis=1)[idx[metrics_cols, :]].diff().fillna(0) + ) + + new_df = df.stack() + # Rename the diff columns + new_df = new_df.rename({col: "new_" + col for col in metrics_cols}, axis=1) + + return new_df.reset_index()