Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create out_zenodo_logs to track changes in downloads over time #209

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions migrations/versions/f5dbba621bb1_add_out_zenodo_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Add out_zenodo_logs

Revision ID: f5dbba621bb1
Revises: b4fee22b4a4d
Create Date: 2024-10-25 13:30:27.163055

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'f5dbba621bb1'
down_revision: Union[str, None] = 'b4fee22b4a4d'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('out_zenodo_logs',
sa.Column('metrics_date', sa.Date(), nullable=False, comment='The date when the metadata was reported.'),
sa.Column('version', sa.String(), nullable=True, comment='The version (e.g. 10.0.0) of the dataset record.'),
sa.Column('dataset_slug', sa.String(), nullable=True, comment='The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.'),
sa.Column('new_dataset_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'),
sa.Column('new_dataset_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'),
sa.Column('new_dataset_views', sa.Integer(), nullable=True, comment='The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'),
sa.Column('new_dataset_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'),
sa.Column('new_version_downloads', sa.Integer(), nullable=True, comment='The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.'),
sa.Column('new_version_unique_downloads', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.'),
sa.Column('new_version_views', sa.Integer(), nullable=True, comment='The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.'),
sa.Column('new_version_unique_views', sa.Integer(), nullable=True, comment='The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.'),
sa.Column('version_title', sa.String(), nullable=True, comment='The name of the version in Zenodo.'),
sa.Column('version_id', sa.Integer(), nullable=False, comment='The unique ID of the Zenodo version. This is identical to the version DOI.'),
sa.Column('version_record_id', sa.Integer(), nullable=True, comment='The record ID of the Zenodo version. This is identical to the version ID.'),
sa.Column('concept_record_id', sa.Integer(), nullable=True, comment='The concept record ID. This is shared between all versions of a record.'),
sa.Column('version_creation_date', sa.DateTime(), nullable=True, comment='The datetime the record was created.'),
sa.Column('version_last_modified_date', sa.DateTime(), nullable=True, comment='The datetime the record was last modified.'),
sa.Column('version_last_updated_date', sa.DateTime(), nullable=True, comment='The datetime the record was last updated.'),
sa.Column('version_publication_date', sa.Date(), nullable=True, comment='The date that the version was published.'),
sa.Column('version_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo version.'),
sa.Column('concept_record_doi', sa.String(), nullable=True, comment='The DOI of the Zenodo concept record.'),
sa.Column('version_doi_url', sa.String(), nullable=True, comment='The DOI link of the Zenodo version.'),
sa.Column('version_status', sa.String(), nullable=True, comment='The status of the Zenodo version.'),
sa.Column('version_state', sa.String(), nullable=True, comment='The state of the Zenodo version.'),
sa.Column('version_submitted', sa.Boolean(), nullable=True, comment='Is the version submitted?'),
sa.Column('version_description', sa.String(), nullable=True, comment='The description of the version.'),
sa.Column('partition_key', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('metrics_date', 'version_id')
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('out_zenodo_logs')
# ### end Alembic commands ###
80 changes: 71 additions & 9 deletions notebooks/cloud_sql_connection.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
"## Using Google connection with Dagster"
]
},
{
"cell_type": "markdown",
"id": "bb2abcf6",
"metadata": {},
"source": [
"First connect to the existing postgres DB. You'll need to whitelist your IP if you haven't already!"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -28,16 +36,70 @@
"%%time\n",
"import numpy as np\n",
"import pandas as pd\n",
"import sqlalchemy as sa\n",
"\n",
"from usage_metrics.resources.postgres import PostgresManager\n",
"\n",
"rng = np.random.default_rng()\n",
"df = pd.DataFrame(rng.integers(0, 100, size=(100, 4)), columns=list(\"ABCD\"))\n",
"from usage_metrics.resources.postgres import PostgresIOManager\n",
"\n",
"engine = PostgresManager()._create_engine()\n",
"engine = PostgresIOManager().engine"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18b519db",
"metadata": {},
"outputs": [],
"source": [
"metadata = sa.MetaData()\n",
"metadata.reflect(engine)\n",
"\n",
"with engine.connect() as con:\n",
" df.to_sql(\"new_table\", con, if_exists=\"replace\")"
"# Get metadata by table\n",
"metadata.tables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ea927736",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "4bd5a6d8",
"metadata": {},
"outputs": [],
"source": [
"# To delete data from a single table - TREAD CAREFULLY!\n",
"table_name = \"core_zenodo_logs\"\n",
"#with engine.connect() as conn:\n",
"# conn.execute(metadata.tables['core_zenodo_logs'].delete())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29c664a2",
"metadata": {},
"outputs": [],
"source": [
"# To delete everything! TREAD CAREFULLY!\n",
"#metadata.drop_all(engine)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a9224e8c",
"metadata": {},
"outputs": [],
"source": [
"# Check that this worked as expected\n",
"engine = PostgresIOManager().engine\n",
"metadata.reflect(engine)\n",
"metadata.tables"
]
},
{
Expand Down Expand Up @@ -244,7 +306,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "pudl-usage-metrics",
"language": "python",
"name": "python3"
},
Expand All @@ -258,7 +320,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
"version": "3.12.5"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion src/usage_metrics/core/zenodo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@asset(
partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"),
# io_manager_key="database_manager",
io_manager_key="database_manager",
tags={"source": "zenodo"},
)
def core_zenodo_logs(
Expand Down
1 change: 1 addition & 0 deletions src/usage_metrics/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

out_module_groups = {
"out_s3": [usage_metrics.out.s3],
"out_zenodo": [usage_metrics.out.zenodo],
}

non_partitioned_module_groups = {
Expand Down
138 changes: 138 additions & 0 deletions src/usage_metrics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,144 @@
Column("partition_key", String),
)

out_zenodo_logs = Table(
"out_zenodo_logs",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be helpful to add table descriptions as doc strings or via Table.comment so we can easily understand the differences between the core and out zenodo tables.

usage_metrics_metadata,
Column(
"metrics_date",
Date,
primary_key=True,
comment="The date when the metadata was reported.",
),
Column(
"version",
String,
comment="The version (e.g. 10.0.0) of the dataset record.",
),
Column(
"dataset_slug",
String,
comment="The shorthand for the dataset being archived. Matches the pudl_archiver repository dataset slugs when the dataset is archived by the PUDL archiver.",
),
Column(
"new_dataset_downloads",
Integer,
comment="The number of new daily downloads for the entire dataset. A download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.",
),
Column(
"new_dataset_unique_downloads",
Integer,
comment="The number of new daily unique downloads for the entire dataset. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.",
),
Column(
"new_dataset_views",
Integer,
comment="The number of new daily views for the entire dataset. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.",
),
Column(
"new_dataset_unique_views",
Integer,
comment="The number of new daily unique downloads for the entire dataset. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.",
),
Column(
"new_version_downloads",
Integer,
comment="The number of new daily downloads for the version. A total download is a user (human or machine) downloading a file from a record, excluding double-clicks and robots. If a record has multiple files and you download all files, each file counts as one download.",
),
Column(
"new_version_unique_downloads",
Integer,
comment="The number of new daily unique downloads for the version. A unique download is defined as one or more file downloads from files of a single record by a user within a 1-hour time-window. This means that if one or more files of the same record were downloaded multiple times by the same user within the same time-window, it is considered to be one unique download.",
),
Column(
"new_version_views",
Integer,
comment="The number of new daily views for the version. A total view is a user (human or machine) visiting a record, excluding double-clicks and robots.",
),
Column(
"new_version_unique_views",
Integer,
comment="The number of new daily unique downloads for the version. A unique view is defined as one or more visits by a user within a 1-hour time-window. This means that if the same record was accessed multiple times by the same user within the same time-window, Zenodo considers it as one unique view.",
),
Column(
"version_title",
String,
comment="The name of the version in Zenodo.",
),
Column(
"version_id",
Integer,
primary_key=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nit but could you move this up to top of this table definition to it's easy to understand the full primary key of the table please?

Just so I understand, the version number represents a release of a dataset? I've been out of the zenodo world for a while.

comment="The unique ID of the Zenodo version. This is identical to the version DOI.",
),
Column(
"version_record_id",
Integer,
comment="The record ID of the Zenodo version. This is identical to the version ID.",
),
Column(
"concept_record_id",
Integer,
comment="The concept record ID. This is shared between all versions of a record.",
),
Column(
"version_creation_date",
DateTime,
comment="The datetime the record was created.",
),
Column(
"version_last_modified_date",
DateTime,
comment="The datetime the record was last modified.",
),
Column(
"version_last_updated_date",
DateTime,
comment="The datetime the record was last updated.",
),
Column(
"version_publication_date",
Date,
comment="The date that the version was published.",
),
Column(
"version_doi",
String,
comment="The DOI of the Zenodo version.",
),
Column(
"concept_record_doi",
String,
comment="The DOI of the Zenodo concept record.",
),
Column(
"version_doi_url",
String,
comment="The DOI link of the Zenodo version.",
),
Column(
"version_status",
String,
comment="The status of the Zenodo version.",
),
Column(
"version_state",
String,
comment="The state of the Zenodo version.",
),
Column(
"version_submitted",
Boolean,
comment="Is the version submitted?",
),
Column(
"version_description",
String,
comment="The description of the version.",
),
Column("partition_key", String),
)

intake_logs = Table(
"intake_logs",
usage_metrics_metadata,
Expand Down
2 changes: 1 addition & 1 deletion src/usage_metrics/out/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Module contains assets that transform data into core assets."""

from . import s3
from . import s3, zenodo
61 changes: 61 additions & 0 deletions src/usage_metrics/out/zenodo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Create outputs from Zenodo logs."""

import pandas as pd
from dagster import (
AssetExecutionContext,
WeeklyPartitionsDefinition,
asset,
)


@asset(
partitions_def=WeeklyPartitionsDefinition(start_date="2023-08-16"),
io_manager_key="database_manager",
tags={"source": "zenodo"},
)
def out_zenodo_logs(
context: AssetExecutionContext,
core_zenodo_logs: pd.DataFrame,
) -> pd.DataFrame:
"""Output daily Zenodo logs.

Calculate differences from the cumulative views and downloads columns.
"""
context.log.info(f"Processing data for the week of {context.partition_key}")

if core_zenodo_logs.empty:
return core_zenodo_logs
metrics_cols = [
"dataset_downloads",
"dataset_unique_downloads",
"dataset_views",
"dataset_unique_views",
"version_downloads",
"version_unique_downloads",
"version_views",
"version_unique_views",
]

# Drop mistaken/deleted archives
df = core_zenodo_logs.loc[~core_zenodo_logs.version_id.isin([13919960, 13920120])]

df["metrics_date"] = pd.to_datetime(df["metrics_date"])
# First, ffill all gaps in between dates in case any daily downloads failed
df = df.set_index(["metrics_date", "version_id"]).unstack().ffill()
# Then, backfill all dates prior to the existence of the datasets with 0 for the metric columns
idx = pd.IndexSlice
df.loc(axis=1)[idx[metrics_cols, :]] = (
df.loc(axis=1)[idx[metrics_cols, :]].fillna(0).astype(int)
)
# Backfill the metadata
df = df.bfill()
# # Convert the cumulative sum columns to diff columns
df.loc(axis=1)[idx[metrics_cols, :]] = (
df.loc(axis=1)[idx[metrics_cols, :]].diff().fillna(0)
)

new_df = df.stack()
# Rename the diff columns
new_df = new_df.rename({col: "new_" + col for col in metrics_cols}, axis=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to rename these columns?


return new_df.reset_index()
Loading