Skip to content

Commit

Permalink
feat: tableau sftp fix encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
cbini committed Jan 3, 2025
1 parent 11c1365 commit ddea977
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 92 deletions.
2 changes: 0 additions & 2 deletions src/teamster/code_locations/kipptaf/smartrecruiters/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

from teamster.libraries.smartrecruiters.schema import Applicant, Application

pas_options = py_avro_schema.Option.NO_DOC | py_avro_schema.Option.NO_AUTO_NAMESPACE

APPLICANTS_SCHEMA = json.loads(
py_avro_schema.generate(py_type=Applicant, namespace="applicant")
)
Expand Down
13 changes: 8 additions & 5 deletions src/teamster/code_locations/kipptaf/tableau/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dagster import config_from_files

from teamster.code_locations.kipptaf import CODE_LOCATION, LOCAL_TIMEZONE
from teamster.code_locations.kipptaf.tableau.schema import VIEW_COUNT_PER_VIEW
from teamster.libraries.sftp.assets import build_sftp_folder_asset
from teamster.libraries.tableau.assets import build_tableau_workbook_refresh_asset

Expand All @@ -20,11 +21,13 @@
for a in config_assets
]

traffic_to_views = build_sftp_folder_asset(
asset_key=[CODE_LOCATION, "tableau", "traffic_to_views"],
remote_dir_regex=r"/data-team/kipptaf/tableau/traffic_to_views",
remote_file_regex=r"tableau_usage_\d+\.csv",
avro_schema=...,
view_count_per_view = build_sftp_folder_asset(
asset_key=[CODE_LOCATION, "tableau", "view_count_per_view"],
remote_dir_regex=r"/data-team/kipptaf/tableau/view_count_per_view",
remote_file_regex=r".+\.csv",
file_sep="\t",
file_encoding="utf-16",
avro_schema=VIEW_COUNT_PER_VIEW,
ssh_resource_key="ssh_couchdrop",
)

Expand Down
26 changes: 6 additions & 20 deletions src/teamster/code_locations/kipptaf/tableau/schema.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,14 @@
# trunk-ignore-all(pyright/reportIncompatibleVariableOverride)
import json

import py_avro_schema

from teamster.libraries.tableau.schema import View, Workbook
from teamster.libraries.tableau.schema import ViewCountPerView


class view_record(View):
"""helper class for backwards compatibility"""


class workbook_record(Workbook):
"""helper class for backwards compatibility"""

views: list[view_record | None] | None = None


WORKBOOK_SCHEMA = json.loads(
VIEW_COUNT_PER_VIEW = json.loads(
py_avro_schema.generate(
py_type=workbook_record,
namespace="workbook",
options=py_avro_schema.Option.NO_DOC | py_avro_schema.Option.NO_AUTO_NAMESPACE,
py_type=ViewCountPerView,
options=(
py_avro_schema.Option.NO_DOC | py_avro_schema.Option.NO_AUTO_NAMESPACE
),
)
)

# remove top-level namespace for backwards compatibility
del WORKBOOK_SCHEMA["namespace"]
48 changes: 30 additions & 18 deletions src/teamster/libraries/sftp/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,19 @@ def compose_regex(
return regexp


def extract_csv_to_dict(
local_filepath: str, slugify_cols: bool, slugify_replacements: list[list[str]]
def convert_file_to_dict(
local_filepath: str,
sep: str,
encoding: str,
slugify_cols: bool,
slugify_replacements: list[list[str]] | None = None,
):
df = read_csv(filepath_or_buffer=local_filepath, low_memory=False)
if slugify_replacements is None:
slugify_replacements = []

df = read_csv(
filepath_or_buffer=local_filepath, sep=sep, encoding=encoding, low_memory=False
)

df.replace({nan: None}, inplace=True)

Expand Down Expand Up @@ -81,12 +90,14 @@ def build_sftp_file_asset(
remote_file_regex: str,
ssh_resource_key: str,
avro_schema,
slugify_cols: bool = True,
partitions_def=None,
automation_condition=None,
group_name: str | None = None,
pdf_row_pattern: str | None = None,
exclude_dirs: list[str] | None = None,
file_sep: str = ",",
file_encoding: str = "utf-8",
slugify_cols: bool = True,
slugify_replacements: list[list[str]] | None = None,
tags: dict[str, str] | None = None,
op_tags: dict | None = None,
Expand All @@ -97,9 +108,6 @@ def build_sftp_file_asset(
if exclude_dirs is None:
exclude_dirs = []

if slugify_replacements is None:
slugify_replacements = []

@asset(
key=asset_key,
metadata={
Expand Down Expand Up @@ -209,8 +217,10 @@ def _asset(context: AssetExecutionContext):
pdf_row_pattern=_check.not_none(value=pdf_row_pattern),
)
else:
records, (n_rows, _) = extract_csv_to_dict(
records, (n_rows, _) = convert_file_to_dict(
local_filepath=local_filepath,
sep=file_sep,
encoding=file_encoding,
slugify_cols=slugify_cols,
slugify_replacements=slugify_replacements,
)
Expand All @@ -234,9 +244,11 @@ def build_sftp_archive_asset(
ssh_resource_key: str,
avro_schema,
partitions_def=None,
slugify_cols: bool = True,
group_name: str | None = None,
exclude_dirs: list[str] | None = None,
slugify_cols: bool = True,
file_sep: str = ",",
file_encoding: str = "utf-8",
slugify_replacements: list[list[str]] | None = None,
tags: dict[str, str] | None = None,
op_tags: dict | None = None,
Expand All @@ -247,9 +259,6 @@ def build_sftp_archive_asset(
if exclude_dirs is None:
exclude_dirs = []

if slugify_replacements is None:
slugify_replacements = []

@asset(
key=asset_key,
metadata={
Expand Down Expand Up @@ -357,8 +366,10 @@ def _asset(context: AssetExecutionContext):
context.log.warning(msg=f"File is empty: {local_filepath}")
records, n_rows = ([{}], 0)
else:
records, (n_rows, _) = extract_csv_to_dict(
records, (n_rows, _) = convert_file_to_dict(
local_filepath=local_filepath,
sep=file_sep,
encoding=file_encoding,
slugify_cols=slugify_cols,
slugify_replacements=slugify_replacements,
)
Expand All @@ -381,9 +392,11 @@ def build_sftp_folder_asset(
ssh_resource_key: str,
avro_schema,
partitions_def=None,
slugify_cols: bool = True,
group_name: str | None = None,
exclude_dirs: list[str] | None = None,
slugify_cols: bool = True,
file_sep: str = ",",
file_encoding: str = "utf-8",
slugify_replacements: list[list[str]] | None = None,
tags: dict[str, str] | None = None,
op_tags: dict | None = None,
Expand All @@ -394,9 +407,6 @@ def build_sftp_folder_asset(
if exclude_dirs is None:
exclude_dirs = []

if slugify_replacements is None:
slugify_replacements = []

@asset(
key=asset_key,
metadata={
Expand Down Expand Up @@ -466,8 +476,10 @@ def _asset(context: AssetExecutionContext):
context.log.warning(msg=f"File is empty: {local_filepath}")
continue

records, (n_rows, _) = extract_csv_to_dict(
records, (n_rows, _) = convert_file_to_dict(
local_filepath=local_filepath,
sep=file_sep,
encoding=file_encoding,
slugify_cols=slugify_cols,
slugify_replacements=slugify_replacements,
)
Expand Down
129 changes: 129 additions & 0 deletions src/teamster/libraries/tableau/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,132 @@ class Workbook(BaseModel):
webpage_url: str | None = None

views: list[View | None] | None = None


class ViewCountPerView(BaseModel):
user_name: str | None = None
default_site: str | None = None
domain_name: str | None = None
url_namespace_site_restricted_sites: str | None = None
id_hist_views: int | None = None
action_views: str | None = None
created_at_local: str | None = None
name_hist_views: str | None = None
name_hist_workbooks: str | None = None
action: str | None = None
created_at: str | None = None
url_namespace: str | None = None
id_hist_projects: int | None = None
number_of_records: int | None = None
tz_offset_days: float | None = None
sites_id: int | None = None
action_bridge: str | None = None
action_type: str | None = None
action_data: str | None = None
action_datasource: str | None = None
action_datasource_bridge: str | None = None
action_spuser: str | None = None
action_users: str | None = None
action_view: str | None = None
created_at_local_6_hour: str | None = None
email: str | None = None
extract_created: str | None = None
get_names: str | None = None
group_of_user_actions: str | None = None
hist_actor_site_id: int | None = None
hist_actor_user_id: int | None = None
hist_project_id: int | None = None
hist_target_site_id: int | None = None
hist_view_id: int | None = None
hist_workbook_id: int | None = None
historical_event_type_id: int | None = None
id: int | None = None
id_hist_sites: int | None = None
id_hist_users: int | None = None
id_hist_workbooks: int | None = None
item_name: str | None = None
l10n_accessinteract: str | None = None
l10n_active: str | None = None
l10n_original: str | None = None
l10n_publishdownload: str | None = None
l10n_subs: str | None = None
l10n_total: str | None = None
name_hist_projects: str | None = None
name_site_restricted_sites: str | None = None
project_id: int | None = None
repository_url_hist_views: str | None = None
revision_hist_views: float | None = None
revision_hist_workbooks: float | None = None
site: str | None = None
site_id: int | None = None
site_name: str | None = None
site_role_id: int | None = None
size_hist_workbooks: int | None = None
status: str | None = None
system_admin_level: int | None = None
system_user_id: int | None = None
type_id: int | None = None
user_id: int | None = None
view_hier: str | None = None
view_id: int | None = None
view_name: str | None = None
view_url: str | None = None
workbook_hier: str | None = None
workbook_id: int | None = None
workbook_url: str | None = None
worker: str | None = None

# best guess
certification_note: str | None = None
comment_id: int | None = None
comment: str | None = None
content_version: str | None = None
datasource_id: int | None = None
details_last_string: str | None = None
details_status_code: str | None = None
details_trailing_detail: str | None = None
details: str | None = None
duration_in_ms: str | None = None
duration_of_refresh_seconds: str | None = None
duration_seconds: str | None = None
extract_name: str | None = None
flow_id: int | None = None
hist_capability_id: int | None = None
hist_column_id: int | None = None
hist_comment_id: int | None = None
hist_config_id: int | None = None
hist_data_connection_id: int | None = None
hist_data_role_id: int | None = None
hist_database_id: int | None = None
hist_datasource_id: int | None = None
hist_flow_id: int | None = None
hist_group_id: int | None = None
hist_licensing_role_id_hist_users: str | None = None
hist_licensing_role_id: int | None = None
hist_metric_id: int | None = None
hist_published_connection_id: int | None = None
hist_remote_agent_id: int | None = None
hist_schedule_id: int | None = None
hist_table_id: int | None = None
hist_tag_id: int | None = None
hist_target_user_id: int | None = None
hist_task_id: int | None = None
id_hist_comments: str | None = None
id_hist_datasources: str | None = None
id_hist_flows: str | None = None
id_hist_metrics: str | None = None
id_hist_remote_agents: str | None = None
is_certified: str | None = None
metric_id: int | None = None
name_hist_datasources: str | None = None
name_hist_flows: str | None = None
name_hist_metrics: str | None = None
publisher_tristate: str | None = None
remote_agent_id: int | None = None
repository_url: str | None = None
revision: str | None = None
site_admin_level: int | None = None
size_hist_flows: str | None = None
size: str | None = None
tableau_bridge_client_name: str | None = None
using_remote_query_agent: str | None = None
4 changes: 2 additions & 2 deletions tests/assets/sftp/test_assets_sftp_couchdrop.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,6 @@ def test_pearson_student_list_report_kippnewark():


def test_tableau_traffic_to_views_kipptaf():
from teamster.code_locations.kipptaf.tableau.assets import traffic_to_views
from teamster.code_locations.kipptaf.tableau.assets import view_count_per_view

_test_asset(asset=traffic_to_views)
_test_asset(asset=view_count_per_view)
45 changes: 0 additions & 45 deletions tests/assets/test_tableau_assets.py

This file was deleted.

0 comments on commit ddea977

Please sign in to comment.