Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module for sending datasets to Google cloud #18

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ seaborn = ">=0.11,<0.12"
scipy = ">=1.5,<1.9"
yfinance = ">=0.1.55,<0.2.0"
starknet_py = "*"
google-cloud-bigquery = "^3.0.0"
google-cloud-storage = "^2.8.0"
pyyaml = "^6.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
90 changes: 90 additions & 0 deletions src/data/GCS-Dataset-Upload-Manager.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Google Cloud Storage Dataset Manager

This Python module provides a class for managing datasets in Google Cloud Storage (GCS). It includes functionalities for uploading, reading, updating, deleting, and listing datasets in various formats like **JSON**, **CSV**, and **Pickle**.

## Features

- **Upload Dataset**: Upload data in various formats (CSV, JSON, Pickle) to Google Cloud Storage.
- **Read Dataset**: Read and download datasets from Google Cloud Storage.
- **Update Dataset**: Update (overwrite) an existing dataset in the cloud.
- **Delete Dataset**: Delete a dataset from the cloud.
- **List Datasets**: List all datasets stored in a bucket with optional filtering by prefix.

## Prerequisites

Before you start using this module, ensure you have the following:

1. **Google Cloud Account**: You must have a Google Cloud project and a GCS bucket to store your datasets.
2. **Google Cloud SDK**: Install and configure the Google Cloud SDK, including setting up authentication via service accounts.
3. **Required Python Packages**:
- `google-cloud-storage`: Library to interact with Google Cloud Storage.
- `pandas`: For handling and manipulating datasets (especially for CSV handling).

### Installation

1. **Install Python dependencies**:
- You can install the necessary libraries using pip. Run the following command:

```bash
pip install google-cloud-storage pandas
```

2. **Set up Google Cloud Credentials**:
- You need to authenticate your Google Cloud access by setting the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. You can do this by running:

```bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-file.json"
```

- Make sure to replace `/path/to/your/service-account-file.json` with the path to your service account JSON file.

## Module Overview

The **`Datasets`** class includes methods for uploading, reading, updating, deleting, and listing datasets in your Google Cloud Storage bucket.

### Constants

- `DEFAULT_PROJECT_ID`: The Google Cloud project ID (default: `"strkfarm"`).
- `DEFAULT_BUCKET_NAME`: The GCS bucket name (default: `"strkfarm"`).

### Class: `Datasets`

This class allows you to interact with your GCS bucket and perform dataset management tasks.

#### Methods:

1. **`__init__(self, project_id: str, bucket_name: str)`**:
- Initializes the `Datasets` class with Google Cloud credentials and bucket information.

2. **`upload_dataset(self, data, filename, data_format='json')`**:
- Uploads a dataset to Google Cloud Storage in the specified format.
- Supported formats: `json`, `csv`, `pickle`.

3. **`read_dataset(self, filename, data_format='json', as_dataframe=False)`**:
- Reads a dataset from Google Cloud Storage in the specified format and returns the data.
- Returns a pandas DataFrame if the data is in CSV format and `as_dataframe=True`.

4. **`update_dataset(self, data, filename, data_format='json')`**:
- Updates (overwrites) an existing dataset in Google Cloud Storage.

5. **`delete_dataset(self, filename)`**:
- Deletes a dataset from Google Cloud Storage.

6. **`list_datasets(self, prefix=None)`**:
- Lists all datasets in the GCS bucket, optionally filtering by prefix.



## Running the Code

### To run the code:
1. **`Set up the environment:`**
- Ensure you have the Google Cloud SDK installed and configured.
- Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to point to your service account key file:
```bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-file.json"
```
2. **`Run the Python script:`**
- Execute the script using the following command:
```python3 datasets2.py```

128 changes: 128 additions & 0 deletions src/data/datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import functions_framework # type: ignore
import logging
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments throughtout.

import os
import traceback
import re

from google.cloud import bigquery
from google.cloud import storage

import yaml # type: ignore

# Load schema configuration from a YAML file
with open("./schemas.yaml") as schema_file:
config = yaml.safe_load(schema_file)

# Constants
PROJECT_ID = os.getenv('strkfarm') # Environment variable for project ID
BQ_DATASET = 'staging'
CLOUD_STORAGE_CLIENT = storage.Client()
BIGQUERY_CLIENT = bigquery.Client()
JOB_CONFIG = bigquery.LoadJobConfig()

def streaming(data):
josephchimebuka marked this conversation as resolved.
Show resolved Hide resolved
"""
Process a file uploaded to a Cloud Storage bucket and stream its data to BigQuery.

Args:
data (dict): Event data containing bucket name, file name, and time created.
"""
bucket_name = data['bucket']
print("Bucket name:", bucket_name)
file_name = data['name']
print("File name:", file_name)
time_created = data['timeCreated']
print("Time Created:", time_created)

try:
for table in config:
table_name = table.get('name')
if re.search(table_name.replace('_', '-'), file_name) or re.search(table_name, file_name):
table_schema = table.get('schema')
_check_if_table_exists(table_name, table_schema)
table_format = table.get('format')

if table_format == 'NEWLINE_DELIMITED_JSON':
_load_table_from_uri(bucket_name, file_name, table_schema, table_name)
except Exception:
print('Error streaming file. Cause: %s' % (traceback.format_exc()))

def _check_if_table_exists(table_name, table_schema):
"""
Check if a BigQuery table exists; if not, create it.

Args:
table_name (str): Name of the table to check or create.
table_schema (list): Schema of the table defined in YAML.
"""
table_id = BIGQUERY_CLIENT.dataset(BQ_DATASET).table(table_name)

try:
BIGQUERY_CLIENT.get_table(table_id)
except Exception:
logging.warning('Creating table: %s' % table_name)
schema = create_schema_from_yaml(table_schema)
table = bigquery.Table(table_id, schema=schema)
table = BIGQUERY_CLIENT.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

def _load_table_from_uri(bucket_name, file_name, table_schema, table_name):
"""
Load data from a Cloud Storage file into a BigQuery table.

Args:
bucket_name (str): Name of the Cloud Storage bucket.
file_name (str): Name of the file in the bucket.
table_schema (list): Schema of the BigQuery table.
table_name (str): Name of the BigQuery table.
"""
uri = f'gs://{bucket_name}/{file_name}'
table_id = BIGQUERY_CLIENT.dataset(BQ_DATASET).table(table_name)

schema = create_schema_from_yaml(table_schema)
JOB_CONFIG.schema = schema
JOB_CONFIG.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
JOB_CONFIG.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

load_job = BIGQUERY_CLIENT.load_table_from_uri(uri, table_id, job_config=JOB_CONFIG)
load_job.result()
print("Job finished.")

def create_schema_from_yaml(table_schema):
"""
Create a BigQuery schema from YAML configuration.

Args:
table_schema (list): Schema configuration in YAML format.

Returns:
list: List of BigQuery SchemaField objects.
"""
schema = []
for column in table_schema:
schema_field = bigquery.SchemaField(column['name'], column['type'], column['mode'])
schema.append(schema_field)

if column['type'] == 'RECORD':
schema_field._fields = create_schema_from_yaml(column['fields'])
return schema

@functions_framework.cloud_event
def hello_gcs(cloud_event):
"""
Triggered by a change in a Cloud Storage bucket.

Args:
cloud_event (CloudEvent): Event data for the trigger.
"""
data = cloud_event.data

print(f"Event ID: {cloud_event['id']}")
print(f"Event type: {cloud_event['type']}")
print(f"Bucket: {data['bucket']}")
print(f"File: {data['name']}")
print(f"Metageneration: {data['metageneration']}")
print(f"Created: {data['timeCreated']}")
print(f"Updated: {data['updated']}")

streaming(data)
Loading