Skip to content

Commit

Permalink
Release/551 (#1605)
Browse files Browse the repository at this point in the history
* wait until UDF ready feature, automated docker repo login  and minor … (#1589)

* wait until UDF ready feature, automated docker repo login  and minor refactors

* remove credentials log

* less verbose logging

* snowflake UDF Creation tutorial notebook added

* snowflake docs update

* update nb

* bump version

* bump version
  • Loading branch information
C-K-Loan authored Nov 17, 2024
1 parent 0acfa79 commit 921f1ad
Show file tree
Hide file tree
Showing 7 changed files with 619 additions and 47 deletions.
18 changes: 18 additions & 0 deletions docs/en/jsl/jsl_release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,24 @@ sidebar:
See [Github Releases](https://github.com/JohnSnowLabs/johnsnowlabs/releases) for detailed information on Release History and Features


## 5.5.1
Release date: 11-17-2024

The John Snow Labs 5.5.1 Library released with the following pre-installed and recommended dependencies

{:.table-model-big}
| Library | Version |
|-----------------------------------------------------------------------------------------|------------|
| [Visual NLP](https://nlp.johnsnowlabs.com/docs/en/spark_ocr_versions/ocr_release_notes) | `5.4.1` |
| [Enterprise NLP](https://nlp.johnsnowlabs.com/docs/en/licensed_annotators) | `5.5.0` |
| [Finance NLP](https://nlp.johnsnowlabs.com/docs/en/financial_release_notes) | `1.X.X` |
| [Legal NLP](https://nlp.johnsnowlabs.com/docs/en/legal_release_notes) | `1.X.X` |
| [NLU](https://github.com/JohnSnowLabs/nlu/releases) | `5.4.1` |
| [Spark-NLP-Display](https://sparknlp.org/docs/en/display) | `5.0` |
| [Spark-NLP](https://github.com/JohnSnowLabs/spark-nlp/releases/) | `5.5.0` |
| [Pyspark](https://spark.apache.org/docs/latest/api/python/) | `3.4.0` |



## 5.5.0
Release date: 10-23-2024
Expand Down
3 changes: 2 additions & 1 deletion docs/en/jsl/snowflake_utils.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ sidebar:
---
<div class="main-docs" markdown="1">

You can easily deploy any John Snow Labs models within the Snowpark Container Services Ecosystem via `nlp.deploy_as_snowflake_udf()`
You can easily deploy any John Snow Labs models within the Snowpark Container Services Ecosystem via `nlp.deploy_as_snowflake_udf()`
The [Create Snowflake Endpoint Notebook](https://github.com/JohnSnowLabs/johnsnowlabs/blob/main/notebooks/create_snowflake_endpoint.ipynb) and this documentation page showcase how you can create a containerized Snowflake UDF from any Johnsnowlabs and how to query it.


## Setup Snowflake Resources
Expand Down
12 changes: 6 additions & 6 deletions johnsnowlabs/auto_install/docker/work_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
def check_image_exist(image_name: str) -> bool:
cmd = f"docker image inspect {image_name}"
return run_cmd_and_check_succ(
[cmd], shell=True, raise_on_fail=False, use_code=True, log=False
[cmd], shell=True, raise_on_fail=False, use_code=True, log=False, log_outputs=False
)


def check_container_exist(container_name: str) -> bool:
cmd = f"docker container inspect {container_name}"
return run_cmd_and_check_succ(
[cmd], shell=True, raise_on_fail=False, use_code=True, log=False
[cmd], shell=True, raise_on_fail=False, use_code=True, log=False, log_outputs=False
)


Expand All @@ -36,9 +36,9 @@ def _destroy_container(container_name: str = None):
stop_cmd = f"docker container stop {container_name}"
rm_cmd = f"docker container rm -f {container_name}"
run_cmd_and_check_succ(
[stop_cmd], shell=True, raise_on_fail=False, use_code=True
[stop_cmd], shell=True, raise_on_fail=False, use_code=True,log_outputs=False
)
run_cmd_and_check_succ([rm_cmd], shell=True, raise_on_fail=False, use_code=True)
run_cmd_and_check_succ([rm_cmd], shell=True, raise_on_fail=False, use_code=True,log_outputs=False)
print(f"Container '{container_name}' destroyed.")
else:
print(f"Container '{container_name}' does not exist.")
Expand All @@ -48,7 +48,7 @@ def _destroy_image(image_name: str = None):
image_name = settings.docker_image_name if image_name is None else image_name
if check_image_exist(image_name):
rm_cmd = f"docker image rm -f {image_name}"
run_cmd_and_check_succ([rm_cmd], shell=True, raise_on_fail=False, use_code=True)
run_cmd_and_check_succ([rm_cmd], shell=True, raise_on_fail=False, use_code=True, log_outputs=False)
print(f"Image '{image_name}' destroyed.")
else:
print(f"Tried to destroy image '{image_name}', but does not exist.")
Expand Down Expand Up @@ -239,7 +239,7 @@ def build_image(
nlp=nlp,
hardware_platform=hardware_platform,
)
run_cmd_and_check_succ([cmd], shell=True, raise_on_fail=True, use_code=True)
run_cmd_and_check_succ([cmd], shell=True, raise_on_fail=True, use_code=True, log_outputs=False)


def run_container_cmd(container_name=None, image_name=None, destroy_container=False):
Expand Down
101 changes: 71 additions & 30 deletions johnsnowlabs/auto_install/snowflake/work_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

from johnsnowlabs import nlp
from johnsnowlabs.auto_install.docker.work_utils import check_local_endpoint_health, _destroy_container
from johnsnowlabs.auto_install.docker.work_utils import check_local_endpoint_health, _destroy_container, _destroy_image
from johnsnowlabs.utils.py_process import run_cmd_and_check_succ


Expand Down Expand Up @@ -99,10 +99,40 @@ def push_snowflake_image(remote_repo, image_name):
)


def wait_until_service_created(client, service, timeout=60*20):
cur = client.cursor()
start_time = time.time() # Start time tracking

while True:
elapsed_time = time.time() - start_time # Calculate elapsed time
if elapsed_time > timeout:
raise TimeoutError(f"Timeout reached: Service {service} did not reach RUNNING state within {timeout} seconds.")

res = cur.execute(f'DESCRIBE SERVICE {service};').fetchone()
state = res[1]

if state == 'RUNNING':
return True
elif state == 'PENDING':
pass
elif state in ['FAILED', 'DONE', 'SUSPENDING', 'SUSPENDED', 'DELETING', 'DELETED', 'INTERNAL_ERROR']:
raise Exception(f"State of service {service} is {state}; cannot continue.")

print(f"State of {service} is {state}. Waiting for service creation...")
time.sleep(60)



def create_service(client, service_name, compute_pool_name, image_path, role, database, warehouse, schema):
cur = client.cursor()
cmd = get_service_create_cmd(service_name, compute_pool_name, image_path, role, database, warehouse, schema)
cur.execute(cmd, num_statements=cmd.count(';'))
try:
cur.execute(cmd, num_statements=cmd.count(';'))
except Exception as e:
if 'already exist' in e.raw_msg:
print(f'A resource already exists, see error message for more details')
print(e.raw_msg)
raise Exception("Snowflake Service creation failed see error message for more details")
for row in cur:
print(row)
cur.close()
Expand Down Expand Up @@ -146,17 +176,28 @@ def tag_image(image_name, remote_repo):


def build_test_and_push_image(nlu_ref, license_path, image_name, local_test_container_name, local_test_port,
remote_repo):
# build image, test it locally, tag it, push it and destroy the local image
# TODO check while pushing if not authorized/logged in fail or not
login_cmd = f'docker login {remote_repo}'
remote_repo, user, password, destroy_image=False):
login_cmd = f"echo {password} | docker login {remote_repo} -u {user} --password-stdin"

# 1. build image,
build_snowflake_image(nlu_ref, image_name, license_path)

# 2. test it locally
test_snowflake_image_local(image_name, local_test_container_name, local_test_port)

# 3. tag it
tag_image(image_name, remote_repo)
# TODO TEST IF ACTUALLY LOGGED IN !!

# 4. login to docker repo,
print(f'Logging into repo {remote_repo} with user {user}')
run_cmd_and_check_succ(login_cmd, text=True, shell=True, use_code=True, log=False)

# 5. push image
push_snowflake_image(remote_repo, image_name)
# _destroy_image(image_name)

# 6. destroy the local image
if destroy_image:
_destroy_image(image_name)


def get_service_logs(snowflake_user, snowflake_password, snowflake_account, warehouse_name, database_name,
Expand Down Expand Up @@ -192,7 +233,7 @@ def snowflake_common_setup(snowflake_user, snowflake_account, snowflake_password
# todo params for warehouse_name size and compute-pool
base_cmd = f"""
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE {warehouse_name};
CREATE ROLE IF NOT EXISTS {role_name};
CREATE DATABASE IF NOT EXISTS {db_name};
Expand All @@ -201,10 +242,6 @@ def snowflake_common_setup(snowflake_user, snowflake_account, snowflake_password
CREATE OR REPLACE WAREHOUSE {warehouse_name} WITH WAREHOUSE_SIZE='X-SMALL';
GRANT USAGE ON WAREHOUSE {warehouse_name} TO ROLE {role_name};
CREATE SECURITY INTEGRATION IF NOT EXISTS snowservices_ingress_oauth
TYPE=oauth
OAUTH_CLIENT=snowservices_ingress
ENABLED=true;
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE {role_name};
Expand All @@ -221,16 +258,16 @@ def snowflake_common_setup(snowflake_user, snowflake_account, snowflake_password

is_snowflake_installed()
import snowflake.connector
c = snowflake.connector.connect(user=snowflake_user, password=snowflake_password, account=snowflake_account)
c = snowflake.connector.connect(user=snowflake_user, password=snowflake_password, account=snowflake_account, role='ACCOUNTADMIN')
cur = c.cursor()

r = cur.execute(base_cmd, num_statements=base_cmd.count(';'))
succ = r.fetchall()[0][0] == 'Statement executed successfully.'
if succ:
print(f'Role {role_name} created and access granted to {snowflake_user}')
print(f'Database {db_name} created')
print(f'Warehouse {warehouse_name} crated')
print(f'Warehouse {warehouse_name} crated')
print(f'Compute Pool {compute_pool_name} crated')
print(f'Created Role {role_name} and access granted to {snowflake_user}')
print(f'Created Database {db_name}')
print(f'Created Warehouse {warehouse_name}')
print(f'Created Compute Pool {compute_pool_name}')

create_db_objects_cmd = f"""
USE ROLE {role_name};
Expand All @@ -247,9 +284,11 @@ def snowflake_common_setup(snowflake_user, snowflake_account, snowflake_password

succ = r.fetchall()[0][0] == 'Statement executed successfully.'
if succ:
print(f'Schema {schema_name} crated')
print(f'Repository {repo_name} crated')
print(f'Stage {stage_name} crated')
print(f'Created Schema {schema_name}')
print(f'Created Repository {repo_name}')
print(f'Created Stage {stage_name}')


else:
print('Failure creating Schema, Repository and Stage!')

Expand All @@ -274,8 +313,7 @@ def verify_image_repo(verify_prefix):
return response_repo_url

repo_url = verify_image_repo(verify_prefix)

print(f'Remote repo URL is {repo_url}')
print(f'Created Snowflake Container Repository {repo_url}')
return role_name, db_name, warehouse_name, schema_name, compute_pool_name, repo_url


Expand All @@ -292,7 +330,8 @@ def deploy_as_snowflake_udf(nlu_ref,
license_path=None,
udf_name=None,
service_name=None,

service_creation_timeout = 60*20,
destroy_image=False,
):
client = get_client(snowflake_user, snowflake_password, snowflake_account, warehouse_name, database_name,
schema_name, role_name)
Expand All @@ -311,23 +350,25 @@ def deploy_as_snowflake_udf(nlu_ref,
if not udf_name:
udf_name = f'{clean_nlu_ref}_udf'.replace('-', '_')

# TODO block while status pending optinally
# r = get_service_logs(snowflake_user, snowflake_password, snowflake_account, warehouse_name, database_name,
# schema_name, role_name, service_name)
# print(r)
# 2. Local Docker Setup, Tests and Push to Snowflake
build_test_and_push_image(nlu_ref, license_path, image_name, container_name, port, repo_url)
build_test_and_push_image(nlu_ref, license_path, image_name, container_name, port, repo_url, snowflake_user, snowflake_password, destroy_image)

# 3. Snowflake: Create service, create udf and test udf
print(f'Starting Snowflake Procedure')
create_service(client, service_name, compute_pool_name, remote_image, role_name, database_name, warehouse_name,
schema_name)
print(f'Service {service_name} created')
time.sleep(1 * 60) # wait ~ n seconds for container sto spin up, expo backup..!
print(f'Created Service {service_name}')
wait_until_service_created(client, service_name, service_creation_timeout)
# time.sleep(1 * 60) # wait ~ n seconds for container sto spin up, expo backup..!
create_udf(client, service_name, udf_name, role_name, database_name, warehouse_name, schema_name)
print(f'UDF {udf_name} created')
print(f'Created UDF {udf_name}')

print('testing UDF...')
test_udf(client, udf_name)

return udf_name


2 changes: 1 addition & 1 deletion johnsnowlabs/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

# These versions are used for auto-installs and version checks

raw_version_jsl_lib = "5.5.0"
raw_version_jsl_lib = "5.5.1"

raw_version_nlp = "5.5.0"

Expand Down
26 changes: 17 additions & 9 deletions johnsnowlabs/utils/py_process.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import subprocess
import sys
from pathlib import Path
from typing import List, Callable
from typing import List, Callable, Union

import colorama
import pandas as pd
Expand All @@ -10,13 +10,15 @@
from johnsnowlabs.utils.file_utils import str_to_file

def run_cmd_and_check_succ(
args: List[str],
args: Union[List[str],str],
log=True,
suc_print=johnsnowlabs.utils.testing.test_settings.success_worker_print,
return_pipes=False,
shell=False,
raise_on_fail=False,
use_code=False,
text=False,
log_outputs=True,
) -> bool:
if log:
if len(args) == 1:
Expand All @@ -28,7 +30,7 @@ def run_cmd_and_check_succ(
f"👷 Executing {colorama.Fore.LIGHTGREEN_EX}{args}{colorama.Fore.RESET}"
)

r = subprocess.run(args, capture_output=True, shell=shell)
r = subprocess.run(args, capture_output=True, shell=shell, text=text)
was_suc = process_was_suc(r, suc_print=suc_print, use_code=use_code)
if log:
if was_suc:
Expand All @@ -40,7 +42,7 @@ def run_cmd_and_check_succ(
f"{colorama.Fore.LIGHTRED_EX}❌ Failure running {args}{colorama.Fore.LIGHTGREEN_EX}"
)

if log:
if log_outputs:
log_process(r)
if raise_on_fail and not was_suc:
raise ValueError(f"Failed running {args}")
Expand All @@ -59,14 +61,20 @@ def process_was_suc(
elif use_code and result.returncode == 0:
return True
else:
return suc_print in result.stdout.decode()
try:
return suc_print in result.stdout.decode()
except Exception as err:
return suc_print in result.stdout


def log_process(result: subprocess.CompletedProcess):
print("______________STDOUT:")
print(result.stdout.decode())
print("______________STDERR:")
print(result.stderr.decode())
stdout = result.stdout.decode() if hasattr(result.stdout, "decode") else result.stdout
stderr = result.stderr.decode() if hasattr(result.stderr, "decode") else result.stderr
if stdout:
print(f"STDOUT: {stdout}")
if stderr:
print(f"STDERR: {stderr}")




Expand Down
Loading

0 comments on commit 921f1ad

Please sign in to comment.