Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(component): Created Snowflake data unload component #11349

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions components/snowflake/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# PIPELINE DEFINITION
# Name: snowflake-unload-op
# Description: Run COPY in snowflake to unload data to GCS bucket.
# output_gcs_path: the location to land the data
# sf_storage_integration: the Snowflake Storage Integration name
# query: the query to execute in the COPY command
# sf_user: the snowflake username
# sf_password: the snowflake password
# sf_warehouse: the snowflake warehouse name
# sf_database: the database to use
# Inputs:
# output_gcs_path: str
# query: str
# sf_account: str
# sf_database: str
# sf_password: str
# sf_storage_integration: str
# sf_user: str
# sf_warehouse: str
# Outputs:
# Output: str
components:
comp-snowflake-unload-op:
executorLabel: exec-snowflake-unload-op
inputDefinitions:
parameters:
output_gcs_path:
parameterType: STRING
query:
parameterType: STRING
sf_account:
parameterType: STRING
sf_database:
parameterType: STRING
sf_password:
parameterType: STRING
sf_storage_integration:
parameterType: STRING
sf_user:
parameterType: STRING
sf_warehouse:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-snowflake-unload-op:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- snowflake_unload_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'snowflake-connector-python:3.12.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)


printf "%s" "$0" > "$program_path/ephemeral_component.py"

_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef snowflake_unload_op(\n output_gcs_path: str,\n sf_storage_integration:\
\ str,\n query: str,\n sf_user: str,\n sf_password: str,\n sf_account:\
\ str,\n sf_warehouse: str,\n sf_database: str\n ) -> str:\n \
\ \"\"\"\n Run COPY in snowflake to unload data to GCS bucket.\n\n \
\ output_gcs_path: the location to land the data\n sf_storage_integration:\
\ the Snowflake Storage Integration name\n query: the query to execute\
\ in the COPY command\n sf_user: the snowflake username\n sf_password:\
\ the snowflake password\n sf_warehouse: the snowflake warehouse name\n\
\ sf_database: the database to use\n \"\"\"\n import snowflake.connector\n\
\ conn = snowflake.connector.connect(user=sf_user,\n \
\ password=sf_password,\n \
\ account=sf_account,\n \
\ role=\"ACCOUNTADMIN\")\n\n conn.cursor().execute(f\"USE WAREHOUSE\
\ {sf_warehouse};\")\n conn.cursor().execute(f\"USE DATABASE {sf_database};\"\
)\n result = conn.cursor().execute(f\"\"\"\n COPY INTO 'gcs://{output_gcs_path}'\n\
\ FROM ({query})\n FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE)\n \
\ STORAGE_INTEGRATION = {sf_storage_integration}\n HEADER = TRUE\n\
\ \"\"\")\n _ = result.fetchall()\n if output_gcs_path.endswith(\"\
/\"):\n return output_gcs_path + \"data_0_0_0.csv\"\n else:\n\
\ return output_gcs_path\n\n"
image: python:3.11
pipelineInfo:
name: snowflake-unload-op
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: snowflake-unload-op
tasks:
snowflake-unload-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-snowflake-unload-op
inputs:
parameters:
output_gcs_path:
componentInputParameter: output_gcs_path
query:
componentInputParameter: query
sf_account:
componentInputParameter: sf_account
sf_database:
componentInputParameter: sf_database
sf_password:
componentInputParameter: sf_password
sf_storage_integration:
componentInputParameter: sf_storage_integration
sf_user:
componentInputParameter: sf_user
sf_warehouse:
componentInputParameter: sf_warehouse
taskInfo:
name: snowflake-unload-op
inputDefinitions:
parameters:
output_gcs_path:
parameterType: STRING
query:
parameterType: STRING
sf_account:
parameterType: STRING
sf_database:
parameterType: STRING
sf_password:
parameterType: STRING
sf_storage_integration:
parameterType: STRING
sf_user:
parameterType: STRING
sf_warehouse:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0
56 changes: 56 additions & 0 deletions components/snowflake/snowflake_unload_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
This is a KFP component doing "unload data to GCS bucket" operation
from the Snowflake database.
"""
from kfp import compiler
from kfp.dsl import component

@component(
base_image="python:3.11",
packages_to_install=["snowflake-connector-python:3.12.3"]
)
def snowflake_unload_op(
output_gcs_path: str,
sf_storage_integration: str,
query: str,
sf_user: str,
sf_password: str,
sf_account: str,
sf_warehouse: str,
sf_database: str
) -> str:
"""
Run COPY in snowflake to unload data to GCS bucket.

output_gcs_path: the location to land the data
sf_storage_integration: the Snowflake Storage Integration name
query: the query to execute in the COPY command
sf_user: the snowflake username
sf_password: the snowflake password
sf_warehouse: the snowflake warehouse name
sf_database: the database to use
"""
import snowflake.connector
conn = snowflake.connector.connect(user=sf_user,
password=sf_password,
account=sf_account,
role="ACCOUNTADMIN")

conn.cursor().execute(f"USE WAREHOUSE {sf_warehouse};")
conn.cursor().execute(f"USE DATABASE {sf_database};")
result = conn.cursor().execute(f"""
COPY INTO 'gcs://{output_gcs_path}'
FROM ({query})
FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE)
STORAGE_INTEGRATION = {sf_storage_integration}
HEADER = TRUE
""")
_ = result.fetchall()
if output_gcs_path.endswith("/"):
return output_gcs_path + "data_0_0_0.csv"
else:
return output_gcs_path

if __name__ == "__main__":
compiler.Compiler().compile(pipeline_func=snowflake_unload_op,
package_path="component.yaml")
Loading