diff --git a/.AL-Go/settings.json b/.AL-Go/settings.json index 978975c..1bd4a15 100644 --- a/.AL-Go/settings.json +++ b/.AL-Go/settings.json @@ -7,5 +7,5 @@ "businessCentral\\test" ], "bcptTestFolders": [], - "repoVersion": "24.23" + "repoVersion": "24.25" } diff --git a/businessCentral/app/app.json b/businessCentral/app/app.json index 0976e26..85bfb6d 100644 --- a/businessCentral/app/app.json +++ b/businessCentral/app/app.json @@ -4,7 +4,7 @@ "publisher": "The bc2adls team", "brief": "Sync data from Business Central to the Azure storage", "description": "Exports data in chosen tables to the Azure Data Lake and keeps it in sync by incremental updates. Before you use this tool, please read the SUPPORT.md file at https://github.com/microsoft/bc2adls.", - "version": "24.23.0.0", + "version": "24.25.0.0", "privacyStatement": "https://go.microsoft.com/fwlink/?LinkId=724009", "EULA": "https://go.microsoft.com/fwlink/?linkid=2009120", "help": "https://go.microsoft.com/fwlink/?LinkId=724011", diff --git a/businessCentral/app/src/Communication.Codeunit.al b/businessCentral/app/src/Communication.Codeunit.al index 1a2b9e7..27a2d4c 100644 --- a/businessCentral/app/src/Communication.Codeunit.al +++ b/businessCentral/app/src/Communication.Codeunit.al @@ -32,6 +32,7 @@ codeunit 82562 "ADLSE Communication" EntitySchemaChangedErr: Label 'The schema of the table %1 has changed. %2', Comment = '%1 = Entity name, %2 = NotAllowedOnSimultaneousExportTxt'; CdmSchemaChangedErr: Label 'There may have been a change in the tables to export. %1', Comment = '%1 = NotAllowedOnSimultaneousExportTxt'; MSFabricUrlTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2.Lakehouse/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; + MSFabricUrlGuidTxt: Label 'https://onelake.dfs.fabric.microsoft.com/%1/%2/Files', Locked = true, Comment = '%1: Workspace name, %2: Lakehouse Name'; ResetTableExportTxt: Label '/reset/%1.txt', Locked = true, comment = '%1 = Table name'; procedure SetupBlobStorage() @@ -47,6 +48,7 @@ codeunit 82562 "ADLSE Communication" local procedure GetBaseUrl(): Text var ADLSESetup: Record "ADLSE Setup"; + ValidGuid: Guid; begin ADLSESetup.GetSingleton(); case ADLSESetup.GetStorageType() of @@ -58,8 +60,10 @@ codeunit 82562 "ADLSE Communication" exit(StrSubstNo(ContainerUrlTxt, ADLSESetup."Account Name", DefaultContainerName)); end; ADLSESetup."Storage Type"::"Microsoft Fabric": - exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); - + if not Evaluate(ValidGuid, ADLSESetup.Lakehouse) then + exit(StrSubstNo(MSFabricUrlTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)) + else + exit(StrSubstNo(MSFabricUrlGuidTxt, ADLSESetup.Workspace, ADLSESetup.Lakehouse)); end; end; diff --git a/businessCentral/app/src/CurrentSession.Table.al b/businessCentral/app/src/CurrentSession.Table.al index 20af953..1d06e31 100644 --- a/businessCentral/app/src/CurrentSession.Table.al +++ b/businessCentral/app/src/CurrentSession.Table.al @@ -83,6 +83,7 @@ table 82565 "ADLSE Current Session" procedure AreAnySessionsActive() AnyActive: Boolean begin + Rec.SetRange("Company Name", CopyStr(CompanyName(), 1, 30)); if Rec.FindSet(false) then repeat if IsSessionActive() then begin @@ -94,8 +95,8 @@ table 82565 "ADLSE Current Session" procedure CleanupSessions() begin - Rec.SetRange("Company Name", CompanyName()); - Rec.DeleteAll(); + Rec.SetRange("Company Name", CopyStr(CompanyName(), 1, 30)); + Rec.DeleteAll(false); end; procedure CancelAll() diff --git a/businessCentral/app/src/FieldAPI.Page.al b/businessCentral/app/src/FieldAPI.Page.al index eefb702..8da7dc2 100644 --- a/businessCentral/app/src/FieldAPI.Page.al +++ b/businessCentral/app/src/FieldAPI.Page.al @@ -50,6 +50,21 @@ page 82567 "ADLSE Field API" SetActionResponse(ActionContext, Rec.SystemId); end; + [ServiceEnabled] + procedure Enable(var ActionContext: WebServiceActionContext) + var + SelectedADLSEField: Record "ADLSE Field"; + begin + CurrPage.SetSelectionFilter(SelectedADLSEField); + if SelectedADLSEField.FindSet(true) then + repeat + SelectedADLSEField.Validate(Enabled, true); + SelectedADLSEField.Modify(true); + until SelectedADLSEField.Next() = 0; + SetActionResponse(ActionContext, Rec.SystemId); + end; + + local procedure SetActionResponse(var ActionContext: WebServiceActionContext; AdlsId: Guid) var begin diff --git a/businessCentral/app/src/Setup.Table.al b/businessCentral/app/src/Setup.Table.al index c6d4892..50fefbc 100644 --- a/businessCentral/app/src/Setup.Table.al +++ b/businessCentral/app/src/Setup.Table.al @@ -107,10 +107,30 @@ table 82560 "ADLSE Setup" field(30; Workspace; Text[100]) { Caption = 'Workspace'; + trigger OnValidate() + var + ValidGuid: Guid; + begin + if not Evaluate(ValidGuid, Rec.Workspace) then + if (StrLen(Rec.Workspace) < 3) or (StrLen(Rec.Workspace) > 24) + or TextCharactersOtherThan(Rec.Workspace, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') + then + Error(WorkspaceIncorrectFormatErr); + end; } field(31; Lakehouse; Text[100]) { Caption = 'Lakehouse'; + trigger OnValidate() + var + ValidGuid: Guid; + begin + if not Evaluate(ValidGuid, Rec.Lakehouse) then + if (StrLen(Rec.Lakehouse) < 3) or (StrLen(Rec.Lakehouse) > 24) + or TextCharactersOtherThan(Rec.Lakehouse, 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_') + then + Error(LakehouseIncorrectFormatErr); + end; } field(35; "Schema Exported On"; DateTime) { @@ -172,8 +192,10 @@ table 82560 "ADLSE Setup" var MaxReqErrorInfo: ErrorInfo; - ContainerNameIncorrectFormatErr: Label 'The container name is in an incorrect format.'; - AccountNameIncorrectFormatErr: Label 'The account name is in an incorrect format.'; + ContainerNameIncorrectFormatErr: Label 'The container name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_'; + AccountNameIncorrectFormatErr: Label 'The account name is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890'; + WorkspaceIncorrectFormatErr: Label 'The workspace is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; + LakehouseIncorrectFormatErr: Label 'The lakehouse is in an incorrect format. Please only use abcdefghijklmnopqrstuvwxyz1234567890_ or a valid GUID'; RecordDoesNotExistErr: Label 'No record on this table exists.'; PrimaryKeyValueLbl: Label '0', Locked = true; NoSchemaExportedErr: Label 'Schema already exported. Please perform the action "clear schema export date" before changing the schema.'; diff --git a/businessCentral/test/app.json b/businessCentral/test/app.json index 0de1d93..bbf759d 100644 --- a/businessCentral/test/app.json +++ b/businessCentral/test/app.json @@ -2,7 +2,7 @@ "id": "03486619-1622-4261-ae0e-b366b3c96e3c", "name": "Azure Data Lake Storage Export Tests", "publisher": "The bc2adls team", - "version": "24.23.0.0", + "version": "24.25.0.0", "brief": "Tests for Azure Data Lake Storage Export", "description": "Tests for Azure Data Lake Storage Export", "privacyStatement": "", diff --git a/fabric/CopyBusinessCentral.ipynb b/fabric/CopyBusinessCentral.ipynb index 2bc3f59..a486067 100644 --- a/fabric/CopyBusinessCentral.ipynb +++ b/fabric/CopyBusinessCentral.ipynb @@ -1 +1 @@ -{"cells":[{"cell_type":"markdown","id":"ae53e9bf-8787-4d07-b709-d896fd16cc5f","metadata":{"editable":false,"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"source":["## Business Central merge data notebook\n","In this part the files in the delta folder will be merge with the Lakehouse table.\n","- It iterates first on the folders to append to the existing table.\n","- After that is will remove all duplicates by sorting the table. \n","- At last it will remove all deleted records inside the table that are deleted in Business Central\n","\n","Please change the parameters in the first part."]},{"cell_type":"code","execution_count":null,"id":"34dc5721-e317-4dc0-88ef-2c6bafb494da","metadata":{"cellStatus":"{\"MOD Administrator\":{\"queued_time\":\"2023-08-15T09:15:05.6812441Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-08-15T09:15:06.8530455Z\",\"execution_finish_time\":\"2023-08-15T09:15:07.1828235Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}","editable":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python"},"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"outputs":[],"source":["%%pyspark\n","# settings\n","spark.conf.set(\"sprk.sql.parquet.vorder.enabled\",\"true\")\n","spark.conf.set(\"spark.microsoft.delta.optimizewrite.enabled\",\"true\")\n","spark.conf.set(\"spark.sql.parquet.filterPushdown\", \"true\")\n","spark.conf.set(\"spark.sql.parquet.mergeSchema\", \"false\")\n","spark.conf.set(\"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version\", \"2\")\n","spark.conf.set(\"spark.sql.delta.commitProtocol.enabled\", \"true\")\n","\n","# file paths\n","folder_path_spark = 'Files/deltas/' # this is mostly the default\n","folder_path_json = '/lakehouse/default/Files/' # this is mostly the default\n","folder_path_reset = '/lakehouse/default/Files/reset/' # this is mostly the default\n","folder_path = '/lakehouse/default/Files/deltas/' # this is mostly the default\n","\n","# parameters\n","workspace = 'businessCentral' #can also be a GUID\n","Lakehouse = 'businessCentral'; #can also be a GUID\n","Remove_delta = True; #will remove the delta files if everything is processed\n","Drop_table_if_mismatch = False; #option to drop the table if json file has different columns then in the table\n","no_Partition = 258 #how many partition is used in the dataframe, a good starting point might be 2-4 partitions per CPU core in your Spark cluster\n","DecimalFormat = 'float' #how to format the decimal numbers, can be 'float' or 'decimal(10,3)'. If you change this it will be a breaking change for the table\n","DateTimeFormat = 'date' #how to format the datetime, can be 'timestamp' or 'date'. If you change this it will be a breaking change for the table"]},{"cell_type":"code","execution_count":null,"id":"33ddc3d7","metadata":{},"outputs":[],"source":["%%pyspark\n","import os\n","import json\n","from pyspark.sql.types import *\n","\n","if Drop_table_if_mismatch:\n","\n"," def count_keys(obj): \n"," if isinstance(obj, dict): \n"," return len(obj) + sum(count_keys(v) for v in obj.values()) \n"," if isinstance(obj, list): \n"," return sum(count_keys(v) for v in obj) \n"," return 0 \n","\n"," for filename in os.listdir(folder_path_json):\n"," if \"manifest\" not in filename: # exclude the manifest files\n"," if filename.endswith(\".cdm.json\"):\n"," table_name = filename.replace(\"-\",\"\")\n"," table_name = table_name.replace(\".cdm.json\",\"\")\n","\n"," if table_name in [t.name for t in spark.catalog.listTables()]:\n"," #count number of columns in excisting table\n"," SQL_Query = \"SELECT * FROM \" + Lakehouse +\".\"+table_name; \n"," df = spark.sql(SQL_Query)\n"," num_cols_table = len(df.columns) \n","\n"," #count number of columns in json file \n"," f = open(folder_path_json + filename)\n"," schema = json.load(f)\n"," has_attributes = schema[\"definitions\"][0][\"hasAttributes\"] \n"," num_names = len(has_attributes)\n","\n"," if num_cols_table != num_names:\n"," df = spark.sql(\"DROP TABLE IF EXISTS \"+ Lakehouse + \".\" + table_name)"]},{"cell_type":"code","execution_count":null,"id":"5669531f","metadata":{},"outputs":[],"source":["%%pyspark\n","import os\n","import glob\n","from pyspark.sql.types import *\n","\n","if os.path.exists(folder_path_reset):\n"," for filename in os.listdir(folder_path_reset):\n"," # Remove the table\n"," table_name = filename.replace(\"-\",\"\")\n"," table_name = table_name.replace(\".txt\",\"\")\n","\n"," df = spark.sql(\"DROP TABLE IF EXISTS \"+ Lakehouse + \".\" + table_name)\n","\n"," try: \n"," os.remove(folder_path_reset + '/' + filename) \n"," except OSError as e: # this would catch any error when trying to delete the file \n"," print(f\"Error: {filename} : {e.strerror}\")"]},{"cell_type":"code","execution_count":null,"id":"0594c099-6512-4777-82e2-9a3a058512fe","metadata":{"cellStatus":"{\"MOD Administrator\":{\"queued_time\":\"2023-08-15T09:15:05.7249665Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-08-15T09:15:07.7601315Z\",\"execution_finish_time\":\"2023-08-15T09:15:18.128035Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}","collapsed":false,"editable":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python"},"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"outputs":[],"source":["%%pyspark\n","import json\n","import os\n","import glob\n","from pyspark.sql.types import *\n","from pyspark.sql.utils import AnalysisException\n","from pyspark.sql.functions import col\n","from pyspark.sql.functions import desc\n","file_list = []\n","\n","for entry in os.scandir(folder_path):\n"," if entry.is_dir():\n","\n"," for filename in glob.glob(folder_path + entry.name + '/*'): \n"," table_name = entry.name.replace(\"-\",\"\")\n"," ContainsCompany = False\n"," df_new = spark.read.option(\"minPartitions\", no_Partition).format(\"csv\").option(\"header\",\"true\").load(folder_path_spark + entry.name +\"/*\") \n"," file_list.append(filename) #collect the imported filed in a list for deletion later on\n","\n"," f = open(folder_path_json + entry.name +\".cdm.json\")\n"," schema = json.load(f)\n"," # Parse the schema to get column names and data types\n"," column_names = [attr[\"name\"] for attr in schema[\"definitions\"][0][\"hasAttributes\"]] \n"," if '$Company' in column_names:\n"," ContainsCompany = True\n"," column_types = [attr['dataFormat'] for attr in schema[\"definitions\"][0][\"hasAttributes\"]] \n"," for col_name, col_type in zip(column_names, column_types):\n"," if col_type == \"String\":\n"," col_type = \"string\"\n"," if col_type == \"Guid\":\n"," col_type = \"string\"\n"," if col_type == \"Code\":\n"," col_type = \"object\"\n"," if col_type == \"Option\":\n"," col_type = \"string\"\n"," if col_type == \"Date\":\n"," col_type = \"date\"\n"," if col_type == \"Time\":\n"," col_type = \"string\"\n"," if col_type == \"DateTime\":\n"," col_type = DateTimeFormat\n"," if col_type == \"Duration\":\n"," col_type = \"timedelta\"\n"," if col_type == \"Decimal\":\n"," col_type = DecimalFormat\n"," if col_type == \"Boolean\":\n"," col_type = \"boolean\"\n"," if col_type == \"Integer\":\n"," col_type = \"int\"\n"," if col_type == \"Int64\":\n"," col_type = \"int\"\n"," if col_type == \"Int32\":\n"," col_type = \"int\"\n","\n"," df_new = df_new.withColumn(col_name, df_new[col_name].cast(col_type))\n","\n","\n"," #check if the table excists\n"," if table_name in [t.name for t in spark.catalog.listTables()]: \n"," #read the old data into a new dataframe and union with the new dataframe\n"," SQL_Query = \"SELECT * FROM \" + Lakehouse +\".\"+table_name; \n"," #print(SQL_Query)\n"," df_old = spark.sql(SQL_Query)\n"," df_new = df_new.union(df_old).repartition(no_Partition)\n","\n"," #delete all old records\n"," df_deletes = df_new.filter(df_new['SystemCreatedAt-2000000001'].isNull())\n"," if ContainsCompany:\n"," df_new = df_new.join(df_deletes, ['$Company','systemId-2000000000'], 'leftanti')\n"," else:\n"," df_new = df_new.join(df_deletes, ['systemId-2000000000'], 'leftanti')\n"," \n"," # remove duplicates by filtering on systemID and systemModifiedAt fields\n"," if ContainsCompany:\n"," df_new = df_new.orderBy('$Company','systemId-2000000000',desc('SystemModifiedAt-2000000003'))\n"," df_new = df_new.dropDuplicates(['$Company','systemId-2000000000'])\n"," else:\n"," df_new = df_new.orderBy('systemId-2000000000',desc('SystemModifiedAt-2000000003'))\n"," df_new = df_new.dropDuplicates(['systemId-2000000000'])\n"," \n"," #overwrite the dataframe in the new table\n"," df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name) \n"," else: \n"," #table isn't there so just insert it\n"," df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name)\n","\n"," #delete the files\n"," if Remove_delta:\n"," for filename in file_list: \n"," try: \n"," os.remove(filename) \n"," except OSError as e: # this would catch any error when trying to delete the file \n"," print(f\"Error: {filename} : {e.strerror}\")\n"," file_list = [] # clear the list"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"host":{"synapse_widget":{"state":{},"token":"a69b4b72-86b0-4373-b695-ef01cd53bbb1"},"trident":{"lakehouse":{"default_lakehouse":"9fbacb3e-d0df-43a4-814b-abe4cb623a81","known_lakehouses":"[{\"id\":\"9fbacb3e-d0df-43a4-814b-abe4cb623a81\"}]"}}},"language":"python","ms_spell_check":{"ms_spell_check_language":"en"}},"notebook_environment":{},"nteract":{"version":"nteract-front-end@1.0.0"},"save_output":true,"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{},"enableDebugMode":false}},"synapse_widget":{"state":{},"version":"0.1"},"trident":{"lakehouse":{"default_lakehouse":"9fbacb3e-d0df-43a4-814b-abe4cb623a81","default_lakehouse_name":"businessCentral","default_lakehouse_workspace_id":"21a92229-a0fb-4256-86bd-4b847b8006ed","known_lakehouses":[{"id":"9fbacb3e-d0df-43a4-814b-abe4cb623a81"}]}},"widgets":{}},"nbformat":4,"nbformat_minor":5} +{"cells":[{"cell_type":"markdown","id":"ae53e9bf-8787-4d07-b709-d896fd16cc5f","metadata":{"editable":false,"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"source":["## Business Central merge data notebook\n","In this part the files in the delta folder will be merge with the Lakehouse table.\n","- It iterates first on the folders to append to the existing table.\n","- After that is will remove all duplicates by sorting the table. \n","- At last it will remove all deleted records inside the table that are deleted in Business Central\n","\n","Please change the parameters in the first part."]},{"cell_type":"code","execution_count":null,"id":"34dc5721-e317-4dc0-88ef-2c6bafb494da","metadata":{"cellStatus":"{\"MOD Administrator\":{\"queued_time\":\"2023-08-15T09:15:05.6812441Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-08-15T09:15:06.8530455Z\",\"execution_finish_time\":\"2023-08-15T09:15:07.1828235Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}","editable":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python"},"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"outputs":[],"source":["%%pyspark\n","# settings\n","spark.conf.set(\"sprk.sql.parquet.vorder.enabled\",\"true\")\n","spark.conf.set(\"spark.microsoft.delta.optimizewrite.enabled\",\"true\")\n","spark.conf.set(\"spark.sql.parquet.filterPushdown\", \"true\")\n","spark.conf.set(\"spark.sql.parquet.mergeSchema\", \"false\")\n","spark.conf.set(\"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version\", \"2\")\n","spark.conf.set(\"spark.sql.delta.commitProtocol.enabled\", \"true\")\n","\n","# file paths\n","folder_path_spark = 'Files/deltas/' # this is mostly the default\n","folder_path_json = '/lakehouse/default/Files/' # this is mostly the default\n","folder_path_reset = '/lakehouse/default/Files/reset/' # this is mostly the default\n","folder_path = '/lakehouse/default/Files/deltas/' # this is mostly the default\n","\n","# parameters\n","workspace = 'businessCentral' #can also be a GUID\n","Lakehouse = 'businessCentral'; #can also be a GUID\n","Remove_delta = True; #will remove the delta files if everything is processed\n","Drop_table_if_mismatch = False; #option to drop the table if json file has different columns then in the table\n","no_Partition = 258 #how many partition is used in the dataframe, a good starting point might be 2-4 partitions per CPU core in your Spark cluster\n","DecimalFormat = 'float' #how to format the decimal numbers, can be 'float' or 'decimal(10,3)'. If you change this it will be a breaking change for the table\n","DateTimeFormat = 'date' #how to format the datetime, can be 'timestamp' or 'date'. If you change this it will be a breaking change for the table"]},{"cell_type":"code","execution_count":null,"id":"33ddc3d7","metadata":{},"outputs":[],"source":["%%pyspark\n","import os\n","import json\n","from pyspark.sql.types import *\n","\n","if Drop_table_if_mismatch:\n","\n"," def count_keys(obj): \n"," if isinstance(obj, dict): \n"," return len(obj) + sum(count_keys(v) for v in obj.values()) \n"," if isinstance(obj, list): \n"," return sum(count_keys(v) for v in obj) \n"," return 0 \n","\n"," for filename in os.listdir(folder_path_json):\n"," if \"manifest\" not in filename: # exclude the manifest files\n"," if filename.endswith(\".cdm.json\"):\n"," table_name = filename.replace(\"-\",\"\")\n"," table_name = table_name.replace(\".cdm.json\",\"\")\n","\n"," if table_name in [t.name for t in spark.catalog.listTables()]:\n"," #count number of columns in excisting table\n"," SQL_Query = \"SELECT * FROM \" + Lakehouse +\".\"+table_name; \n"," df = spark.sql(SQL_Query)\n"," num_cols_table = len(df.columns) \n","\n"," #count number of columns in json file \n"," f = open(folder_path_json + filename)\n"," schema = json.load(f)\n"," has_attributes = schema[\"definitions\"][0][\"hasAttributes\"] \n"," num_names = len(has_attributes)\n","\n"," if num_cols_table != num_names:\n"," df = spark.sql(\"DROP TABLE IF EXISTS \"+ Lakehouse + \".\" + table_name)"]},{"cell_type":"code","execution_count":null,"id":"5669531f","metadata":{},"outputs":[],"source":["%%pyspark\n","import os\n","import glob\n","from pyspark.sql.types import *\n","\n","if os.path.exists(folder_path_reset):\n"," for filename in os.listdir(folder_path_reset):\n"," # Remove the table\n"," table_name = filename.replace(\"-\",\"\")\n"," table_name = table_name.replace(\".txt\",\"\")\n","\n"," df = spark.sql(\"DROP TABLE IF EXISTS \"+ Lakehouse + \".\" + table_name)\n","\n"," try: \n"," os.remove(folder_path_reset + '/' + filename) \n"," except OSError as e: # this would catch any error when trying to delete the file \n"," print(f\"Error: {filename} : {e.strerror}\")"]},{"cell_type":"code","execution_count":null,"id":"0594c099-6512-4777-82e2-9a3a058512fe","metadata":{"cellStatus":"{\"MOD Administrator\":{\"queued_time\":\"2023-08-15T09:15:05.7249665Z\",\"session_start_time\":null,\"execution_start_time\":\"2023-08-15T09:15:07.7601315Z\",\"execution_finish_time\":\"2023-08-15T09:15:18.128035Z\",\"state\":\"finished\",\"livy_statement_state\":\"available\"}}","collapsed":false,"editable":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python"},"nteract":{"transient":{"deleting":false}},"run_control":{"frozen":false}},"outputs":[],"source":["%%pyspark\n","import json\n","import os\n","import glob\n","from pyspark.sql.types import *\n","from pyspark.sql.utils import AnalysisException\n","from pyspark.sql.functions import col\n","from pyspark.sql.functions import desc\n","file_list = []\n","\n","for entry in os.scandir(folder_path):\n"," if entry.is_dir():\n","\n"," for filename in glob.glob(folder_path + entry.name + '/*'): \n"," table_name = entry.name.replace(\"-\",\"\")\n"," ContainsCompany = False\n"," df_new = spark.read.option(\"minPartitions\", no_Partition).format(\"csv\").option(\"header\",\"true\").load(folder_path_spark + entry.name +\"/*\") \n"," file_list.append(filename) #collect the imported filed in a list for deletion later on\n","\n"," f = open(folder_path_json + entry.name +\".cdm.json\")\n"," schema = json.load(f)\n"," # Parse the schema to get column names and data types\n"," column_names = [attr[\"name\"] for attr in schema[\"definitions\"][0][\"hasAttributes\"]] \n"," if '$Company' in column_names:\n"," ContainsCompany = True\n"," column_types = [attr['dataFormat'] for attr in schema[\"definitions\"][0][\"hasAttributes\"]] \n"," for col_name, col_type in zip(column_names, column_types):\n"," if col_type == \"String\":\n"," col_type = \"string\"\n"," if col_type == \"Guid\":\n"," col_type = \"string\"\n"," if col_type == \"Code\":\n"," col_type = \"object\"\n"," if col_type == \"Option\":\n"," col_type = \"string\"\n"," if col_type == \"Date\":\n"," col_type = \"date\"\n"," if col_type == \"Time\":\n"," col_type = \"string\"\n"," if col_type == \"DateTime\":\n"," col_type = DateTimeFormat\n"," if col_type == \"Duration\":\n"," col_type = \"timedelta\"\n"," if col_type == \"Decimal\":\n"," col_type = DecimalFormat\n"," if col_type == \"Boolean\":\n"," col_type = \"boolean\"\n"," if col_type == \"Integer\":\n"," col_type = \"int\"\n"," if col_type == \"Int64\":\n"," col_type = \"int\"\n"," if col_type == \"Int32\":\n"," col_type = \"int\"\n"," if col_name == 'SystemModifiedAt-2000000003': #Audit fields must be in timestamp\n"," col_type = \"timestamp\"\n"," if col_name == 'SystemModifiedBy-2000000004': \n"," col_type = \"timestamp\"\n","\n"," df_new = df_new.withColumn(col_name, df_new[col_name].cast(col_type))\n","\n"," #check if the table exists\n"," if table_name in [t.name for t in spark.catalog.listTables()]: \n"," #read the old data into a new dataframe and union with the new dataframe\n"," SQL_Query = \"SELECT * FROM \" + Lakehouse +\".\"+table_name; \n"," #print(SQL_Query)\n"," df_old = spark.sql(SQL_Query)\n"," df_new = df_new.union(df_old).repartition(no_Partition)\n","\n"," #delete all old records\n"," df_deletes = df_new.filter(df_new['SystemCreatedAt-2000000001'].isNull())\n"," if ContainsCompany:\n"," df_new = df_new.join(df_deletes, ['$Company','systemId-2000000000'], 'leftanti')\n"," else:\n"," df_new = df_new.join(df_deletes, ['systemId-2000000000'], 'leftanti')\n"," \n"," # remove duplicates by filtering on systemID and systemModifiedAt fields\n"," if ContainsCompany:\n"," df_new = df_new.orderBy('$Company','systemId-2000000000',desc('SystemModifiedAt-2000000003'))\n"," df_new = df_new.dropDuplicates(['$Company','systemId-2000000000'])\n"," else:\n"," df_new = df_new.orderBy('systemId-2000000000',desc('SystemModifiedAt-2000000003'))\n"," df_new = df_new.dropDuplicates(['systemId-2000000000'])\n"," \n"," #overwrite the dataframe in the new table\n"," df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name) \n"," else: \n"," #table isn't there so just insert it\n"," df_new.write.mode(\"overwrite\").format(\"delta\").save(\"Tables/\" + table_name)\n","\n"," #delete the files\n"," if Remove_delta:\n"," for filename in file_list: \n"," try: \n"," os.remove(filename) \n"," except OSError as e: # this would catch any error when trying to delete the file \n"," print(f\"Error: {filename} : {e.strerror}\")\n"," file_list = [] # clear the list"]}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"host":{"synapse_widget":{"state":{},"token":"a69b4b72-86b0-4373-b695-ef01cd53bbb1"},"trident":{"lakehouse":{"default_lakehouse":"9fbacb3e-d0df-43a4-814b-abe4cb623a81","known_lakehouses":"[{\"id\":\"9fbacb3e-d0df-43a4-814b-abe4cb623a81\"}]"}}},"language":"python","ms_spell_check":{"ms_spell_check_language":"en"}},"notebook_environment":{},"nteract":{"version":"nteract-front-end@1.0.0"},"save_output":true,"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{},"enableDebugMode":false}},"synapse_widget":{"state":{},"version":"0.1"},"trident":{"lakehouse":{"default_lakehouse":"9fbacb3e-d0df-43a4-814b-abe4cb623a81","default_lakehouse_name":"businessCentral","default_lakehouse_workspace_id":"21a92229-a0fb-4256-86bd-4b847b8006ed","known_lakehouses":[{"id":"9fbacb3e-d0df-43a4-814b-abe4cb623a81"}]}},"widgets":{}},"nbformat":4,"nbformat_minor":5}