From ebade847803f66831fc851c122932eee5df3b31e Mon Sep 17 00:00:00 2001 From: Amit Kesarwani <93291915+kesarwam@users.noreply.github.com> Date: Fri, 10 May 2024 14:20:34 -0700 Subject: [PATCH] Updated Version Control of Multi Buckets Pipeline demo to use new Python Wrapper (#193) Co-authored-by: Iddo Avneri The test that fails (R notebook) is already removed in the future version. --- ...n-control-of-multi-buckets-pipelines.ipynb | 310 +++++++----------- 1 file changed, 121 insertions(+), 189 deletions(-) diff --git a/00_notebooks/version-control-of-multi-buckets-pipelines.ipynb b/00_notebooks/version-control-of-multi-buckets-pipelines.ipynb index df044299f..845308962 100644 --- a/00_notebooks/version-control-of-multi-buckets-pipelines.ipynb +++ b/00_notebooks/version-control-of-multi-buckets-pipelines.ipynb @@ -100,115 +100,112 @@ }, { "cell_type": "markdown", - "id": "81c00d9f", + "id": "1de09928-c6d8-45b5-a4d1-ca599d98bb8a", "metadata": {}, "source": [ - "### Create lakeFSClient" + "### Versioning Information " ] }, { "cell_type": "code", "execution_count": null, - "id": "542d15c6", + "id": "a7c2cfd5-2c67-44a3-bbc4-d014fcda2fe4", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "import lakefs_client\n", - "from lakefs_client.models import *\n", - "from lakefs_client.client import LakeFSClient\n", + "repoPrefix = \"multi-bucket-demo\"\n", + "mainBranch = \"main\"\n", "\n", - "# lakeFS credentials and endpoint\n", - "configuration = lakefs_client.Configuration()\n", - "configuration.username = lakefsAccessKey\n", - "configuration.password = lakefsSecretKey\n", - "configuration.host = lakefsEndPoint\n", + "bronzeIngestionBranch = \"bronze-ingestion\"\n", + "silverETLBranch = \"silver-etl\"\n", + "silverDataPath = \"silver_data\"\n", "\n", - "lakefs = LakeFSClient(configuration)" + "fileName = \"lakefs_test.csv\"" ] }, { "cell_type": "markdown", - "id": "f6523b27", + "id": "3678c203-e25a-462f-96ac-d89a6317efeb", "metadata": {}, "source": [ - "#### Verify lakeFS credentials by getting lakeFS version" + "### Import libraries" ] }, { "cell_type": "code", "execution_count": null, - "id": "7d4a3a20", + "id": "d1fc50db-07bd-401f-a5c8-50eaaad72d45", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "print(\"Verifying lakeFS credentials…\")\n", - "try:\n", - " v=lakefs.config.get_config()\n", - "except:\n", - " print(\"🛑 failed to get lakeFS version\")\n", - "else:\n", - " print(f\"…✅lakeFS credentials verified\\n\\nℹ️lakeFS version {v['version_config']['version']}\")" + "import os\n", + "import lakefs\n", + "from assets.lakefs_demo import print_commit, print_diff" ] }, { "cell_type": "markdown", - "id": "c7a3984a", + "id": "93007fa6-3349-4689-bc21-c1f891b9f700", "metadata": {}, "source": [ - "### Set up Spark" + "### Set environment variables" ] }, { "cell_type": "code", "execution_count": null, - "id": "8857019f", + "id": "fc8ff3eb-d6a8-4029-a06c-b18a609b48d4", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "from pyspark.sql import SparkSession\n", - "spark = SparkSession.builder.appName(\"lakeFS / Jupyter\") \\\n", - " .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\") \\\n", - " .config(\"spark.hadoop.fs.s3a.endpoint\", lakefsEndPoint) \\\n", - " .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n", - " .config(\"spark.hadoop.fs.s3a.access.key\", lakefsAccessKey) \\\n", - " .config(\"spark.hadoop.fs.s3a.secret.key\", lakefsSecretKey) \\\n", - " .config(\"spark.jars.packages\", \"io.delta:delta-core_2.12:2.3.0\") \\\n", - " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n", - " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n", - " .config(\"spark.delta.logStore.class\", \"org.apache.spark.sql.delta.storage.S3SingleDriverLogStore\") \\\n", - " .getOrCreate()\n", - "spark.sparkContext.setLogLevel(\"INFO\")\n", - "\n", - "spark" + "os.environ[\"LAKECTL_SERVER_ENDPOINT_URL\"] = lakefsEndPoint\n", + "os.environ[\"LAKECTL_CREDENTIALS_ACCESS_KEY_ID\"] = lakefsAccessKey\n", + "os.environ[\"LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY\"] = lakefsSecretKey" + ] + }, + { + "cell_type": "markdown", + "id": "a1087cfa-9b73-4e58-939c-948df7be78f3", + "metadata": {}, + "source": [ + "### Define lakeFS UI Endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4ec91e02-6ef1-4adc-829a-0760c0fc9c0a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "if lakefsEndPoint.startswith('http://host.docker.internal'):\n", + " lakefsUIEndPoint = 'http://127.0.0.1:8000'\n", + "elif lakefsEndPoint.startswith('http://lakefs:8000'):\n", + " lakefsUIEndPoint = 'http://127.0.0.1:8000'\n", + "else:\n", + " lakefsUIEndPoint = lakefsEndPoint" ] }, { "cell_type": "markdown", - "id": "91f63635-4d23-4489-a360-8e6e42bb070d", + "id": "f6523b27", "metadata": {}, "source": [ - "### Define lakeFS Repository function" + "### Verify lakeFS credentials by getting lakeFS version" ] }, { "cell_type": "code", "execution_count": null, - "id": "f72659d1", + "id": "7d4a3a20", "metadata": { "tags": [], "vscode": { @@ -217,38 +214,27 @@ }, "outputs": [], "source": [ - "from lakefs_client.exceptions import NotFoundException\n", - "\n", - "def create_repo(repo_name, storageNamespace):\n", - " try:\n", - " repo=lakefs.repositories.get_repository(repo_name)\n", - " print(f\"Found existing repo {repo.id} using storage namespace {repo.storage_namespace}\")\n", - " except NotFoundException as f:\n", - " print(f\"Repository {repo_name} does not exist, so going to try and create it now.\")\n", - " try:\n", - " repo=lakefs.repositories.create_repository(repository_creation=RepositoryCreation(name=repo_name,\n", - " storage_namespace=f\"{storageNamespace}/{repo_name}\"))\n", - " print(f\"Created new repo {repo.id} using storage namespace {repo.storage_namespace}\")\n", - " except lakefs_client.ApiException as e:\n", - " print(f\"Error creating repo {repo_name}. Error is {e}\")\n", - " os._exit(00)\n", - " except lakefs_client.ApiException as e:\n", - " print(f\"Error getting repo {repo_name}: {e}\")\n", - " os._exit(00)" + "print(\"Verifying lakeFS credentials…\")\n", + "try:\n", + " v=lakefs.client.Client().version\n", + "except:\n", + " print(\"🛑 failed to get lakeFS version\")\n", + "else:\n", + " print(f\"…✅lakeFS credentials verified\\n\\nℹ️lakeFS version {v}\")" ] }, { "cell_type": "markdown", - "id": "012d5511", + "id": "c7a3984a", "metadata": {}, "source": [ - "## Variables" + "### Set up Spark" ] }, { "cell_type": "code", "execution_count": null, - "id": "8df9df87", + "id": "8857019f", "metadata": { "tags": [], "vscode": { @@ -257,14 +243,21 @@ }, "outputs": [], "source": [ - "repoPrefix = \"multi-bucket-demo\"\n", - "mainBranch = \"main\"\n", - "\n", - "bronzeIngestionBranch = \"bronze-ingestion\"\n", - "silverETLBranch = \"silver-etl\"\n", - "silverDataPath = \"silver_data\"\n", + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"lakeFS / Jupyter\") \\\n", + " .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\") \\\n", + " .config(\"spark.hadoop.fs.s3a.endpoint\", lakefsEndPoint) \\\n", + " .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n", + " .config(\"spark.hadoop.fs.s3a.access.key\", lakefsAccessKey) \\\n", + " .config(\"spark.hadoop.fs.s3a.secret.key\", lakefsSecretKey) \\\n", + " .config(\"spark.jars.packages\", \"io.delta:delta-core_2.12:2.3.0\") \\\n", + " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\") \\\n", + " .config(\"spark.delta.logStore.class\", \"org.apache.spark.sql.delta.storage.S3SingleDriverLogStore\") \\\n", + " .getOrCreate()\n", + "spark.sparkContext.setLogLevel(\"INFO\")\n", "\n", - "fileName = \"lakefs_test.csv\"" + "spark" ] }, { @@ -377,29 +370,6 @@ "goldBucketName = f\"{baseStorageNamespace}/{repoPrefix}-{environment}-gold\"" ] }, - { - "cell_type": "markdown", - "id": "2703ec85-618d-4183-a195-be06383fd4fb", - "metadata": {}, - "source": [ - "## Verify user for Python client" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "08117df8-9f49-4bf1-a65c-1e096abb6a65", - "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } - }, - "outputs": [], - "source": [ - "lakefs.auth.get_current_user()" - ] - }, { "cell_type": "markdown", "id": "344aeba6-88b7-4ca4-8663-f0e7f50afcb4", @@ -411,13 +381,15 @@ { "cell_type": "code", "execution_count": null, - "id": "2ad8a5c6-6cb8-44da-8d03-b8fdd700b5d6", + "id": "59184c81-88f5-40ac-81c6-fb275ca30b23", "metadata": { "tags": [] }, "outputs": [], "source": [ - "create_repo(bronzeRepo,bronzeRepoStorageNamespace)" + "repoBronze = lakefs.Repository(bronzeRepo).create(storage_namespace=bronzeRepoStorageNamespace, default_branch=mainBranch, exist_ok=True)\n", + "repoBronzeBranchMain = repoBronze.branch(mainBranch)\n", + "print(repoBronze)" ] }, { @@ -431,13 +403,14 @@ { "cell_type": "code", "execution_count": null, - "id": "b963cb6b-5e16-424f-a9b5-7a05ac8aa201", + "id": "d29fec72-98e9-4f2e-960f-a652d88138ab", "metadata": { "tags": [] }, "outputs": [], "source": [ - "create_repo(silverRepo,silverRepoStorageNamespace)" + "repoSilver = lakefs.Repository(silverRepo).create(storage_namespace=silverRepoStorageNamespace, default_branch=mainBranch, exist_ok=True)\n", + "print(repoSilver)" ] }, { @@ -451,20 +424,14 @@ { "cell_type": "code", "execution_count": null, - "id": "982e7bbe-fc0a-446b-88a6-d4a2b27fe702", + "id": "d79e69f6-1728-4d35-85aa-88b97b3de811", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "lakefs.branches.create_branch(\n", - " repository=bronzeRepo,\n", - " branch_creation=BranchCreation(\n", - " name=bronzeIngestionBranch,\n", - " source=mainBranch))" + "branchBronzeIngestion = repoBronze.branch(bronzeIngestionBranch).create(source_reference=mainBranch)\n", + "print(f\"{bronzeIngestionBranch} ref:\", branchBronzeIngestion.get_commit().id)" ] }, { @@ -478,20 +445,14 @@ { "cell_type": "code", "execution_count": null, - "id": "15155243-ba86-4ef3-a517-ce3378f940d0", + "id": "f965d40f-2770-412a-8e79-37bd067e8da1", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "contentToUpload = open(f\"/data/{fileName}\", 'rb') # Only a single file per upload which must be named \\\\\\\"content\\\\\\\"\n", - "lakefs.objects.upload_object(\n", - " repository=bronzeRepo,\n", - " branch=bronzeIngestionBranch,\n", - " path=fileName, content=contentToUpload)" + "contentToUpload = open(f\"/data/{fileName}\", 'r').read()\n", + "branchBronzeIngestion.object(fileName).upload(data=contentToUpload, mode='wb', pre_sign=False)" ] }, { @@ -505,28 +466,23 @@ { "cell_type": "code", "execution_count": null, - "id": "dd137848-b6ac-4d79-8f27-bd500fba08c2", + "id": "e2a43653-056b-417e-b24f-767d05ceadaf", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ "dataClassification = 'raw-green'\n", "source = 'bronze'\n", - "target = lakefsEndPoint + '/repositories/' + bronzeRepo + '/object?ref=' + bronzeIngestionBranch + '&path=' + fileName\n", + "target = lakefsUIEndPoint + '/repositories/' + bronzeRepo + '/object?ref=' + bronzeIngestionBranch + '&path=' + fileName\n", "\n", - "lakefs.commits.commit(\n", - " repository=bronzeRepo,\n", - " branch=bronzeIngestionBranch,\n", - " commit_creation=CommitCreation(\n", + "ref = branchBronzeIngestion.commit(\n", " message='Added my first file in ' + bronzeRepo + ' repository!',\n", " metadata={'using': 'python_api',\n", " 'data classification': dataClassification,\n", " '::lakefs::source::url[url:ui]': source,\n", - " '::lakefs::target::url[url:ui]': target}))" + " '::lakefs::target::url[url:ui]': target})\n", + "print_commit(ref.get_commit())" ] }, { @@ -540,19 +496,14 @@ { "cell_type": "code", "execution_count": null, - "id": "7c1778eb-682d-4a57-8de3-577248bde2f0", + "id": "4f3adc90-65e2-4ad6-beb8-bb50962d4807", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "lakefs.refs.merge_into_branch(\n", - " repository=bronzeRepo,\n", - " source_ref=bronzeIngestionBranch, \n", - " destination_branch=mainBranch)" + "res = branchBronzeIngestion.merge_into(mainBranch)\n", + "print(res)" ] }, { @@ -592,17 +543,14 @@ { "cell_type": "code", "execution_count": null, - "id": "07558cba-4c71-4bed-8c91-e2d4f76a9656", + "id": "28b49980-d79f-4faf-a1ba-226e50991a12", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "bronzeCommits = lakefs.refs.log_commits(repository=bronzeRepo, ref=mainBranch, amount=1, objects=[fileName])\n", - "print(bronzeCommits.results)" + "bronzeCommits = list(repoBronzeBranchMain.log(max_amount=1, objects=[fileName]))\n", + "print_commit(bronzeCommits[0])" ] }, { @@ -616,20 +564,14 @@ { "cell_type": "code", "execution_count": null, - "id": "7eb696f7-c1cf-4f7f-ab84-5da402bd8c4a", + "id": "ef0f906e-529c-41a2-aa46-88d6258c7194", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "lakefs.branches.create_branch(\n", - " repository=silverRepo,\n", - " branch_creation=BranchCreation(\n", - " name=silverETLBranch,\n", - " source=mainBranch))" + "branchSilverETL = repoSilver.branch(silverETLBranch).create(source_reference=mainBranch)\n", + "print(f\"{silverETLBranch} ref:\", branchSilverETL.get_commit().id)" ] }, { @@ -668,30 +610,25 @@ { "cell_type": "code", "execution_count": null, - "id": "9cf4faeb-1055-4b1c-a63c-b3ecb91cc2d3", + "id": "b3480497-78f7-4d96-b57a-5ce4335986f8", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ "dataClassification = 'transformed-green'\n", - "source = lakefsEndPoint + '/repositories/' + bronzeRepo + '/object?ref=' + mainBranch + '&path=' + fileName\n", - "source_commit = lakefsEndPoint + '/repositories/' + bronzeRepo + '/commits/' + bronzeCommits.results[0].id\n", - "target = lakefsEndPoint + '/repositories/' + silverRepo + '/objects?ref=' + silverETLBranch + '&path=' + silverDataPath + '/'\n", + "source = lakefsUIEndPoint + '/repositories/' + bronzeRepo + '/object?ref=' + mainBranch + '&path=' + fileName\n", + "source_commit = lakefsUIEndPoint + '/repositories/' + bronzeRepo + '/commits/' + bronzeCommits[0].id\n", + "target = lakefsUIEndPoint + '/repositories/' + silverRepo + '/objects?ref=' + silverETLBranch + '&path=' + silverDataPath + '/'\n", "\n", - "lakefs.commits.commit(\n", - " repository=silverRepo,\n", - " branch=silverETLBranch,\n", - " commit_creation=CommitCreation(\n", + "ref = branchSilverETL.commit(\n", " message='Added transformed data in ' + silverRepo + ' repository!',\n", " metadata={'using': 'python_api',\n", " 'data classification': dataClassification,\n", " '::lakefs::source::url[url:ui]': source,\n", " '::lakefs::source_commit::url[url:ui]': source_commit,\n", - " '::lakefs::target::url[url:ui]': target}))" + " '::lakefs::target::url[url:ui]': target})\n", + "print_commit(ref.get_commit())" ] }, { @@ -705,19 +642,14 @@ { "cell_type": "code", "execution_count": null, - "id": "d74c001f-2ced-4492-af30-e58bfee8653a", + "id": "f6fc05ff-7f25-438e-b1c3-031fc04b41ae", "metadata": { - "tags": [], - "vscode": { - "languageId": "python" - } + "tags": [] }, "outputs": [], "source": [ - "lakefs.refs.merge_into_branch(\n", - " repository=silverRepo,\n", - " source_ref=silverETLBranch, \n", - " destination_branch=mainBranch)" + "res = branchSilverETL.merge_into(mainBranch)\n", + "print(res)" ] }, {