Skip to content

Commit

Permalink
basedosdados 2.0.0b26 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aspeddro committed Jan 23, 2025
1 parent 579ce37 commit d9d09c1
Show file tree
Hide file tree
Showing 14 changed files with 22 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pipelines/datasets/botdosdados/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import pandas as pd
#import seaborn as sns
import tweepy
from basedosdados.download.metadata import _safe_fetch
from prefect import task
from tweepy.auth import OAuthHandler

Expand Down Expand Up @@ -104,7 +103,8 @@ def was_table_updated(page_size: int, hours: int, subset: str, wait=None) -> boo
selected_datasets = list(datasets_links.keys())

url = f"https://basedosdados.org/api/3/action/bd_dataset_search?q=&resource_type=bdm_table&page=1&page_size={page_size}"
response = _safe_fetch(url)
return False
# response = _safe_fetch(url)
json_response = response.json()
datasets = json_response["result"]["datasets"]
n_datasets = len(datasets)
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_bd_indicadores/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
update_metadata,
)

with Flow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
update_metadata,
)

ROOT = "/tmp/data"
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_fgv_igp/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
get_current_flow_labels,
get_temporal_coverage,
rename_current_flow_run_dataset_table,
update_metadata,
)

ROOT = Path("tmp/data")
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_poder360_pesquisas/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_current_flow_labels,
get_temporal_coverage,
rename_current_flow_run_dataset_table,
update_metadata,
)

# pylint: disable=C0103
Expand Down
1 change: 0 additions & 1 deletion pipelines/datasets/br_sp_saopaulo_dieese_icv/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
update_metadata,
)

with Flow(
Expand Down
7 changes: 4 additions & 3 deletions pipelines/datasets/cross_update/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
from pandas import json_normalize
from basedosdados import Backend

from pipelines.utils.metadata.utils import get_headers
from pipelines.utils.utils import log
Expand Down Expand Up @@ -41,7 +42,7 @@ def batch(lst, n):
yield lst[i : i + n]


def find_closed_tables(backend):
def find_closed_tables(backend: Backend):
query = """
query {
allCoverage(isClosed: false, table_Id_Isnull:false, datetimeRanges_Id_Isnull:false) {
Expand Down Expand Up @@ -74,7 +75,7 @@ def find_closed_tables(backend):
}"""

response = backend._execute_query(query=query)
response = backend._simplify_graphql_response(response)["allCoverage"]
response = backend._simplify_response(response)["allCoverage"]
data = json_normalize(response)
open_tables = data["table._id"].tolist()

Expand Down Expand Up @@ -104,7 +105,7 @@ def find_closed_tables(backend):
}"""

response = backend._execute_query(query=query)
response = backend._simplify_graphql_response(response)["allCoverage"]
response = backend._simplify_response(response)["allCoverage"]
data = json_normalize(response)
closed_tables = data["table._id"].tolist()

Expand Down
1 change: 1 addition & 0 deletions pipelines/datasets/test_pipeline/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def upload_to_gcs(path: Union[str, Path], dataset_id: str, table_id: str) -> Non
tb = bd.Table(dataset_id=dataset_id, table_id=table_id)
# st = bd.Storage(dataset_id=dataset_id, table_id=table_id)

# TODO(aspeddro): this method dont exists
if tb.table_existime_stamp(mode="staging"):
# Delete old data
# st.delete_table(
Expand Down
6 changes: 3 additions & 3 deletions pipelines/utils/dump_to_gcs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from basedosdados import backend as bd
from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url
import jinja2
from basedosdados.download.base import google_client
from basedosdados.upload.base import Base
from basedosdados.download.download import _google_client
from basedosdados.core.base import Base
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from prefect import task
Expand Down Expand Up @@ -93,7 +93,7 @@ def download_data_to_gcs( # pylint: disable=R0912,R0913,R0914,R0915
)

# pylint: disable=E1124
client = google_client(billing_project_id, from_file=True, reauth=False)
client = _google_client(billing_project_id, from_file=True, reauth=False)

bq_table_ref = TableReference.from_string(f'{project_id}.{dataset_id}.{table_id}')
bq_table = client["bigquery"].get_table(table = bq_table_ref)
Expand Down
6 changes: 3 additions & 3 deletions pipelines/utils/dump_to_gcs/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
from basedosdados import backend as bd
from basedosdados.download.base import google_client
from basedosdados.download.download import _google_client
from time import sleep
from pipelines.utils.utils import log
from google.cloud import bigquery
from google.cloud.bigquery import TableReference

def execute_query_in_bigquery(billing_project_id, query, path, location):
client = google_client(billing_project_id, from_file=True, reauth=False)
client = _google_client(billing_project_id, from_file=True, reauth=False)
job = client["bigquery"].query(query)
while not job.done():
sleep(1)
Expand All @@ -28,4 +28,4 @@ def execute_query_in_bigquery(billing_project_id, query, path, location):
location=location,
job_config=job_config,
)
return extract_job.result()
return extract_job.result()
4 changes: 2 additions & 2 deletions pipelines/utils/metadata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import datetime, time
from time import sleep
from typing import Dict, Tuple
from basedosdados.download.base import google_client
from basedosdados.download.download import _google_client
from dateutil.relativedelta import relativedelta

from pipelines.constants import constants as pipeline_constants
Expand Down Expand Up @@ -435,7 +435,7 @@ def update_row_access_policy(
date_format: str,
free_parameters: dict,
) -> None:
client = google_client(billing_project_id, from_file=True, reauth=False)
client = _google_client(billing_project_id, from_file=True, reauth=False)

query_bdpro_access = f'CREATE OR REPLACE ROW ACCESS POLICY bdpro_filter ON `{project_id}.{dataset_id}.{table_id}` GRANT TO ("group:[email protected]", "group:[email protected]") FILTER USING (TRUE)'
job = client["bigquery"].query(query_bdpro_access)
Expand Down
2 changes: 1 addition & 1 deletion pipelines/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def create_table_and_upload_to_gcs(
# pylint: disable=C0301
log("STEP UPLOAD: Table does not exist in STAGING, need to create first")


# TODO(aspeddro): dead code
@task(
max_retries=constants.TASK_MAX_RETRIES.value,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
Expand Down
4 changes: 2 additions & 2 deletions scripts/link_directory_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_columns(column_name: str, backend: bd.Backend) -> pd.DataFrame:
variables = {"column_name": column_name}

response = backend._execute_query(query=query, variables=variables)
response = backend._simplify_graphql_response(response)["allColumn"]
response = backend._simplify_response(response)["allColumn"]

df = pd.json_normalize(response)

Expand Down Expand Up @@ -85,7 +85,7 @@ def get_directory_column_id(

variables = {"column_name": directory_column_name}
response = backend._execute_query(query=query, variables=variables)
response = backend._simplify_graphql_response(response)["allColumn"]
response = backend._simplify_response(response)["allColumn"]
df = pd.json_normalize(response)

colunas_de_diretorio = df["table.dataset.fullSlug"].str.contains("diretorios")
Expand Down
8 changes: 4 additions & 4 deletions scripts/upload_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_column_id(table_id: str, column_name: str, backend: b.Backend):
}}"""

data = backend._execute_query(query=query)
data = backend._simplify_graphql_response(response=data)["allColumn"]
data = backend._simplify_response(response=data)["allColumn"]
if data:
return data[0]["_id"]
else:
Expand All @@ -88,7 +88,7 @@ def get_n_columns(table_id, backend: b.Backend):
}}"""

data = backend._execute_query(query=query)
data = backend._simplify_graphql_response(response=data)["allTable"]
data = backend._simplify_response(response=data)["allTable"]

return data[0]["columns"]["edgeCount"]

Expand All @@ -110,7 +110,7 @@ def get_bqtype_dict(backend: b.Backend):
data = backend._execute_query(query=query)

# Simplify the GraphQL response to extract the relevant data
data = backend._simplify_graphql_response(response=data)["allBigquerytype"]
data = backend._simplify_response(response=data)["allBigquerytype"]

# Create a dictionary where the 'name' part is the key and the '_id' is the value
bqtype_dict = {item["name"]: item["_id"] for item in data}
Expand Down Expand Up @@ -147,7 +147,7 @@ def get_all_columns_id(table_id: str, backend: b.Backend):
}}"""

data = backend._execute_query(query=query)
columns_json = backend._simplify_graphql_response(response=data)["allColumn"]
columns_json = backend._simplify_response(response=data)["allColumn"]

if data:
columns_list = [col["_id"] for col in columns_json]
Expand Down

0 comments on commit d9d09c1

Please sign in to comment.