Skip to content

Commit

Permalink
Merge pull request #195 from treeverse/glue-trino-demo-use-python-wra…
Browse files Browse the repository at this point in the history
…pper

Update Glue Trino demo to use Python Wrapper
  • Loading branch information
iddoavn authored May 3, 2024
2 parents 8fdea25 + a70d222 commit a0d9c50
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 49 deletions.
4 changes: 2 additions & 2 deletions 01_standalone_examples/aws-glue-trino/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ This repository includes a Jupyter Notebook which you can run on your local mach

2. Change `lakeFS Endpoint URL`, `Access Key` and `Secret Key` in `trino_configurations.json` file included in the Git repo in `lakeFS-samples/01_standalone_examples/aws-glue-trino` folder.

3. Run following AWS CLI command to create an EMR cluster. Change AWS `region_name`, `log-uri`, `ec2_subnet_name` before running the command.
3. Run following AWS CLI command to create an EMR cluster. Change AWS `region_name`, `log-uri`, `ec2_subnet_name` before running the command. lakeFS Python SDK requires Python v3.9 or above. Python v3.9 is supported starting with EMR v7.0.0.

```bash
aws emr create-cluster \
--release-label emr-6.11.1 \
--release-label emr-7.0.0 \
--applications Name=Trino Name=JupyterEnterpriseGateway Name=Spark \
--configurations file://trino_configurations.json \
--region region_name \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
},
"outputs": [],
"source": [
"sc.install_pypi_package(\"lakefs_client==0.110.0\")"
"sc.install_pypi_package(\"lakefs==0.6.0\")"
]
},
{
Expand Down Expand Up @@ -274,16 +274,28 @@
},
"outputs": [],
"source": [
"import lakefs_client\n",
"from lakefs_client.models import *\n",
"from lakefs_client.client import LakeFSClient\n",
"from lakefs_client.exceptions import NotFoundException\n",
"import lakefs\n",
"import os\n",
"from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField\n",
"from pyspark.sql.functions import *\n",
"from pyhive import trino\n",
"import requests"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "da23fe23-042a-4c86-95e2-3aa92ac92d79",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"os.environ[\"LAKECTL_SERVER_ENDPOINT_URL\"] = lakefsEndPoint\n",
"os.environ[\"LAKECTL_CREDENTIALS_ACCESS_KEY_ID\"] = lakefsAccessKey\n",
"os.environ[\"LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY\"] = lakefsSecretKey"
]
},
{
"cell_type": "markdown",
"id": "eaa5bb3f-7432-4637-bc8e-68159bdcb969",
Expand All @@ -297,19 +309,19 @@
{
"cell_type": "code",
"execution_count": null,
"id": "4795b1e7-761e-4f7a-840b-982f99ff3c6a",
"id": "ed8c6074-b9d9-41d3-b27a-767148d50393",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# lakeFS credentials and endpoint\n",
"configuration = lakefs_client.Configuration()\n",
"configuration.username = lakefsAccessKey\n",
"configuration.password = lakefsSecretKey\n",
"configuration.host = lakefsEndPoint\n",
"\n",
"lakefs = LakeFSClient(configuration)"
"print(\"Verifying lakeFS credentials\")\n",
"try:\n",
" v=lakefs.client.Client().version\n",
"except:\n",
" print(\"Failed to get lakeFS version\")\n",
"else:\n",
" print(f\"lakeFS credentials verified\\n\\nlakeFS version {v}\")"
]
},
{
Expand All @@ -323,25 +335,15 @@
{
"cell_type": "code",
"execution_count": null,
"id": "384aa4d3-038a-4a56-b30e-c536e3911478",
"id": "163ecc3f-eaa9-478a-b929-9fd5ffac385b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"try:\n",
" repo=lakefs.repositories.get_repository(repo_name)\n",
" print(f\"Found existing repo {repo.id} using storage namespace {repo.storage_namespace}\")\n",
"except NotFoundException as f:\n",
" print(f\"Repository {repo_name} does not exist, so going to try and create it now.\")\n",
" try:\n",
" repo=lakefs.repositories.create_repository(repository_creation=RepositoryCreation(name=repo_name,\n",
" storage_namespace=f\"{storageNamespace}/{repo_name}\"))\n",
" print(f\"Created new repo {repo.id} using storage namespace {repo.storage_namespace}\")\n",
" except lakefs_client.ApiException as e:\n",
" print(f\"Error creating repo {repo_name}. Error is {e}\")\n",
"except lakefs_client.ApiException as e:\n",
" print(f\"Error getting repo {repo_name}: {e}\")"
"repo = lakefs.Repository(repo_name).create(storage_namespace=f\"{storageNamespace}/{repo_name}\", default_branch=mainBranch, exist_ok=True)\n",
"branchMain = repo.branch(mainBranch)\n",
"print(repo)"
]
},
{
Expand Down Expand Up @@ -406,7 +408,19 @@
" cur.execute(query)\n",
" result = cur.fetchall()\n",
"\n",
" return result"
" return result\n",
"\n",
"def print_commit(log):\n",
" from datetime import datetime\n",
" from pprint import pprint\n",
"\n",
" print('Message:', log.message)\n",
" print('ID:', log.id)\n",
" print('Committer:', log.committer)\n",
" print('Creation Date:', datetime.utcfromtimestamp(log.creation_date).strftime('%Y-%m-%d %H:%M:%S'))\n",
" print('Parents:', log.parents)\n",
" print('Metadata:')\n",
" pprint(log.metadata)"
]
},
{
Expand Down Expand Up @@ -598,7 +612,7 @@
},
"outputs": [],
"source": [
"print(f\"cd lakeFS-samples && lakectl fs upload -s ./data/OrionStar lakefs://{repo_name}/main/ --recursive && lakectl commit lakefs://{repo_name}/main -m 'Uploaded sample data'\")"
"print(f\"cd lakeFS-samples && lakectl fs upload -s ./data/OrionStar lakefs://{repo_name}/main/data/OrionStar --recursive && lakectl commit lakefs://{repo_name}/main -m 'Uploaded sample data'\")"
]
},
{
Expand Down Expand Up @@ -732,18 +746,13 @@
{
"cell_type": "code",
"execution_count": null,
"id": "d87a83ee-8547-4ed0-81a6-637f6f01838d",
"metadata": {
"tags": []
},
"id": "ef6c6bef-9159-4dec-a515-00820317d07d",
"metadata": {},
"outputs": [],
"source": [
"lakefs.commits.commit(\n",
" repository=repo_name,\n",
" branch=mainBranch,\n",
" commit_creation=CommitCreation(\n",
" message='Added customers and orders tables!', \n",
" metadata={'using': 'python_api'}))"
"ref = branchMain.commit(message='Added customers and orders tables!', \n",
" metadata={'using': 'python_api'})\n",
"print_commit(ref.get_commit())"
]
},
{
Expand Down Expand Up @@ -797,16 +806,12 @@
{
"cell_type": "code",
"execution_count": null,
"id": "9d7daab4-0479-4635-9d8b-9985298a9098",
"metadata": {
"tags": []
},
"id": "b955a4f9-df67-49c0-b7a9-d84d04bd9b8d",
"metadata": {},
"outputs": [],
"source": [
"lakefs.branches.create_branch(\n",
" repository=repo_name, \n",
" branch_creation=BranchCreation(\n",
" name=etlBranch, source=mainBranch))"
"branchETL = repo.branch(etlBranch).create(source_reference=mainBranch, exist_ok=True)\n",
"print(f\"{etlBranch} ref:\", branchETL.get_commit().id)"
]
},
{
Expand Down

0 comments on commit a0d9c50

Please sign in to comment.