diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 093eea5..fb063e9 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -3,11 +3,10 @@ name: Run tests with coverage on: push: branches: - - scala + - main pull_request: branches: - - scala - + - main env: PROJECT_NAME: Arcane.Stream.MicrosoftSynapseLink REGISTRY: ghcr.io diff --git a/.helm/.helmignore b/.helm/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/.helm/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/.helm/Chart.yaml b/.helm/Chart.yaml new file mode 100644 index 0000000..5f6a976 --- /dev/null +++ b/.helm/Chart.yaml @@ -0,0 +1,8 @@ +apiVersion: v2 +name: arcane-stream-microsoft-synapse-link +description: Microsoft Synapse Link Stream for Arcane Operator +type: application + +version: 0.0.0 + +appVersion: 0.0.0 diff --git a/.helm/templates/_helpers.tpl b/.helm/templates/_helpers.tpl new file mode 100644 index 0000000..c50a3e4 --- /dev/null +++ b/.helm/templates/_helpers.tpl @@ -0,0 +1,131 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "app.name" -}} +{{- default .Chart.Name | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "app.fullname" -}} +{{- $name := .Chart.Name }} +{{- if contains .Release.Name $name}} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "app.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "app.labels" -}} +helm.sh/chart: {{ include "app.chart" . }} +{{ include "app.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- with .Values.jobTemplateSettings.additionalLabels }} +{{ toYaml . }} +{{- end }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "app.selectorLabels" -}} +app.kubernetes.io/name: {{ include "app.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "app.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "app.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} + +{{/* +Generage image reference based on image repository and tag +*/}} +{{- define "app.image" -}} +{{- printf "%s:%s" .Values.image.repository (default (printf "v%s" .Chart.AppVersion) .Values.image.tag) }} +{{- end }} + +{{/* +Stream class labels +*/}} +{{- define "streamclass.labels" -}} +helm.sh/chart: {{ include "app.chart" . }} +{{ include "app.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- with .Values.additionalLabels }} +{{ toYaml . }} +{{- end }} +{{- end }} + +{{/* +Generate the job editor cluster role name +*/}} +{{- define "app.clusterRole.jobEditor" -}} +{{- if .Values.rbac.clusterRole.jobEditor.nameOverride }} +{{- .Values.rbac.clusterRole.jobEditor.nameOverride }} +{{- else }} +{{- printf "%s-job-editor" (include "app.fullname" .) }} +{{- end }} +{{- end }} + + +{{/* +Generate the Synapse CR viewer cluster role name +*/}} +{{- define "app.clusterRole.synapseStreamViewer" -}} +{{- if .Values.rbac.clusterRole.synapseStreamViewer.nameOverride }} +{{- .Values.rbac.clusterRole.synapseStreamViewer.nameOverride }} +{{- else }} +{{- printf "%s-viewer" (include "app.fullname" .) }} +{{- end }} +{{- end }} + +{{/* +Generate the CR editor cluster role name +*/}} +{{- define "app.clusterRole.synapseStreamEditor" -}} +{{- if .Values.rbac.clusterRole.synapseStreamEditor.nameOverride }} +{{- .Values.rbac.clusterRole.synapseStreamEditor.nameOverride }} +{{- else }} +{{- printf "%s-editor" (include "app.fullname" .) }} +{{- end }} +{{- end }} + +{{/* +Job template standard labels +*/}} +{{- define "job.labels" -}} +helm.sh/chart: {{ include "app.chart" . }} +{{ include "app.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +{{- with .Values.jobTemplateSettings.additionalLabels }} +{{ toYaml . }} +{{- end }} +{{- end }} diff --git a/.helm/templates/cluster-role-binding-job-editor.yaml b/.helm/templates/cluster-role-binding-job-editor.yaml new file mode 100644 index 0000000..e99339c --- /dev/null +++ b/.helm/templates/cluster-role-binding-job-editor.yaml @@ -0,0 +1,25 @@ +{{- if and .Values.rbac.clusterRole.jobEditor.create .Values.rbac.clusterRoleBindings.create -}} + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "app.clusterRole.jobEditor" . }} + labels: + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRoleBindings.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRoleBindings.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +subjects: + - kind: ServiceAccount + name: {{ template "app.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ template "app.clusterRole.jobEditor" . }} + +{{- end }} diff --git a/.helm/templates/cluster-role-binding-synapse-editor.yaml b/.helm/templates/cluster-role-binding-synapse-editor.yaml new file mode 100644 index 0000000..a7c3126 --- /dev/null +++ b/.helm/templates/cluster-role-binding-synapse-editor.yaml @@ -0,0 +1,25 @@ +{{- if and .Values.rbac.clusterRole.synapseStreamEditor.create .Values.rbac.clusterRoleBindings.create -}} + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ template "app.serviceAccountName" . }}-synapse-editor + labels: + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRoleBindings.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRoleBindings.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +subjects: + - kind: ServiceAccount + name: {{ template "app.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ template "app.clusterRole.synapseStreamEditor" . }} + +{{- end }} diff --git a/.helm/templates/cluster-role-binding-synapse-viewer.yaml b/.helm/templates/cluster-role-binding-synapse-viewer.yaml new file mode 100644 index 0000000..40ed5b2 --- /dev/null +++ b/.helm/templates/cluster-role-binding-synapse-viewer.yaml @@ -0,0 +1,25 @@ +{{- if and .Values.rbac.clusterRole.synapseStreamViewer.create .Values.rbac.clusterRoleBindings.create -}} + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ template "app.serviceAccountName" . }}-synapse-viewer + labels: + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRoleBindings.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRoleBindings.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +subjects: + - kind: ServiceAccount + name: {{ template "app.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ template "app.clusterRole.synapseStreamViewer" . }} + +{{- end }} diff --git a/.helm/templates/cluster-role-crd-synapse-editor.yaml b/.helm/templates/cluster-role-crd-synapse-editor.yaml new file mode 100644 index 0000000..db7cc24 --- /dev/null +++ b/.helm/templates/cluster-role-crd-synapse-editor.yaml @@ -0,0 +1,27 @@ +{{- if .Values.rbac.clusterRole.synapseStreamEditor.create -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "app.clusterRole.synapseStreamEditor" . }} + labels: + rbac.authorization.k8s.io/aggregate-to-edit: "true" + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRole.synapseStreamEditor.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRole.synapseStreamEditor.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +rules: + - verbs: + - create + - update + - patch + - delete + apiGroups: + - streaming.sneaksanddata.com + resources: + - microsoft-synapse-streams + - microsoft-synapse-streams/status +{{- end }} diff --git a/.helm/templates/cluster-role-crd-synapse-viewer.yaml b/.helm/templates/cluster-role-crd-synapse-viewer.yaml new file mode 100644 index 0000000..13b1c8f --- /dev/null +++ b/.helm/templates/cluster-role-crd-synapse-viewer.yaml @@ -0,0 +1,27 @@ +{{- if .Values.rbac.clusterRole.synapseStreamViewer.create -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "app.clusterRole.synapseStreamViewer" . }} + labels: + rbac.authorization.k8s.io/aggregate-to-view: "true" + rbac.authorization.k8s.io/aggregate-to-edit: "true" + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRole.synapseStreamViewer.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRole.synapseStreamViewer.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +rules: + - verbs: + - get + - list + - watch + apiGroups: + - streaming.sneaksanddata.com + resources: + - microsoft-synapse-streams + - microsoft-synapse-streams/status +{{- end }} diff --git a/.helm/templates/cluster-role-job-editor.yaml b/.helm/templates/cluster-role-job-editor.yaml new file mode 100644 index 0000000..a68ebf2 --- /dev/null +++ b/.helm/templates/cluster-role-job-editor.yaml @@ -0,0 +1,23 @@ +{{- if .Values.rbac.clusterRole.jobEditor.create -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "app.clusterRole.jobEditor" . }} + labels: + rbac.authorization.k8s.io/aggregate-to-edit: "true" + {{- include "app.labels" $ | nindent 4 }} + {{- with .Values.rbac.clusterRole.jobEditor.additionalLabels }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.rbac.clusterRole.jobEditor.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +rules: + - verbs: + - patch + apiGroups: + - batch + resources: + - jobs +{{- end }} diff --git a/.helm/templates/crd-microsoft-synapse.yaml b/.helm/templates/crd-microsoft-synapse.yaml new file mode 100644 index 0000000..deef067 --- /dev/null +++ b/.helm/templates/crd-microsoft-synapse.yaml @@ -0,0 +1,285 @@ +{{- if .Values.customResourceDefinitions.create }} +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: microsoft-synapse-streams.streaming.sneaksanddata.com +spec: + group: streaming.sneaksanddata.com + scope: Namespaced + names: + plural: microsoft-synapse-streams + singular: microsoft-synapse-streams + kind: MicrosoftSynapseStream + shortNames: + - mssynapsestream + versions: + - name: v1beta1 + served: true + storage: true + additionalPrinterColumns: + - name: Source location + type: string + jsonPath: .spec.baseLocation + - name: Entity + type: string + jsonPath: .spec.entityName + - name: Refresh Interval + type: string + jsonPath: .spec.changeCaptureIntervalSeconds + - name: Sink location + type: string + jsonPath: .spec.sinkLocation + - name: Phase + type: string + jsonPath: .status.phase + subresources: + status: {} + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + partitionExpression: + type: string + name: + type: string + connectionStringRef: + description: | + Name of the secret containing the connection string. + The secret should have a key named 'ARCANE_CONNECTIONSTRING'. + type: object + properties: + name: + type: string + jobTemplateRef: + description: | + Name of the job template to be used for the streaming job if stream is running in normal mode. + type: object + properties: + name: + type: string + kind: + type: string + apiGroup: + type: string + backfillJobTemplateRef: + description: | + Name of the job template to be used for the streaming job if stream is running in the backfill mode. + type: object + properties: + name: + type: string + kind: + type: string + apiGroup: + type: string + httpClientMaxRetries: + type: integer + description: Max number of retries on blob reads for the http client. + httpClientRetryDelaySeconds: + type: integer + description: Max retry delay on blob reads for the http client. + baseLocation: + type: string + description: Location root for CDM entities, in Proteus format. + entityName: + type: string + description: Name of a CDM entity to stream. + changeCaptureIntervalSeconds: + type: integer + description: How long to wait before polling for next result set. Can be from 1 to 1 hour. + minimum: 1 + maximum: 3600 + rowsPerGroup: + type: integer + description: Number of rows per parquet rowgroup. + groupingIntervalSeconds: + type: integer + description: Max time to wait for rowsPerGroup to accumulate. Can be from 1 to 60 seconds. + minimum: 1 + maximum: 60 + groupsPerFile: + type: integer + description: Number of row groups per file. + sinkLocation: + type: string + description: Data location for parquet files. + lookBackInterval: + type: integer + description: | + Number of seconds to look back when determining first set of changes to extract. + Can be set in interval from 1 second to 10 hours. Default is 1 hour. + minimum: 1 + maximum: 36000 + default: 3600 + schemaUpdateIntervalSeconds: + type: integer + description: | + How often to check for schema updates. Can be set in interval from 1 second to 1 hour. + Default is 10 seconds. + minimum: 1 + maximum: 3600 + default: 10 + catalogAuthSecretRef: + description: | + Name of the secret containing the catalog authentication. + The secret should have the following mandatory keys: + - ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_URI + - ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SCOPE + And the authentication information in the following keys: + - ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_ID + - ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_SECRET + OR + - ARCANE_FRAMEWORK__S3_CATALOG_AUTH_INIT_TOKEN + + The authentication information is mutually exclusive. + If init token is provided, client id and secret are not used. + type: object + properties: + name: + type: string + catalog: + type: object + properties: + namespace: + type: string + warehouse: + type: string + authSecretRef: + description: | + Not used in the operator version 0.0.x + In operator version 0.1.x and above this field will replace the `catalogAuthSecretRef` field. + type: object + properties: + name: + type: string + mergeService: + type: object + properties: + uri: + type: string + authSecretRef: + description: | + Not used in the operator version 0.0.x + In operator version 0.1.x and above this field will replace the `mergeAuthSecretRef` field. + type: object + properties: + name: + type: string + mergeAuthSecretRef: + description: | + Name of the secret containing the data storage authentication for the merge service. + The secret should have a key named 'ARCANE_FRAMEWORK__MERGE_SERVICE__JDBC_URL'. + type: object + properties: + name: + type: string + stagingStorageAuthSecretRef: + description: | + Name of the secret containing the data storage authentication for the S3 bucket containing staging data. + The secret should have the following mandatory keys: + - ARCANE_FRAMEWORK__S3_CATALOG_SECRET_ACCESS_KEY + - ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID + - ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT + - ARCANE_FRAMEWORK__S3_CATALOG_REGION + - AWS_REGION + The AWS_REGION key is used to set the region for the AWS SDK and it's value should be + the same as ARCANE_FRAMEWORK__S3_STAGING_CATALOG_REGION. + type: object + properties: + name: + type: string + stagingStorage: + type: object + properties: + authSecretRef: + type: object + description: | + Not used in the operator version 0.0.x + In operator version 0.1.x and above this field will replace the `stagingStorageAuthSecretRef` field. + properties: + name: + type: string + streamMetadata: + type: object + default: null + properties: + datePartition: + type: object + properties: + description: + type: string + fieldName: + type: string + fieldFormat: + type: string + fieldExpression: + type: string + partitions: + type: array + items: + type: object + properties: + description: + type: string + fieldName: + type: string + fieldFormat: + type: string + stagingLocation: + type: string + description: Data location for staging files. + retention: + type: object + properties: + retentionPeriod: + type: integer + description: Number of days to retain data. + retentionUnit: + default: hours + type: string + enum: + - days + - hours + - minutes + location: + type: string + description: Data location for retention table. + status: + type: object + properties: + phase: + type: string + enum: + - RESTARTING + - RUNNING + - RELOADING + - TERMINATING + - STOPPED + - SUSPENDED + - FAILED + conditions: + type: array + items: + type: object + required: + - status + - type + properties: + message: + type: string + type: + type: string + enum: + - WARNING + - ERROR + - INFO + - READY + status: + type: string + enum: + - "True" + - "False" + {{- end }} diff --git a/.helm/templates/serviceaccount.yaml b/.helm/templates/serviceaccount.yaml new file mode 100644 index 0000000..536878e --- /dev/null +++ b/.helm/templates/serviceaccount.yaml @@ -0,0 +1,15 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ template "app.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "app.labels" $ | nindent 4 }} + {{- if .Values.serviceAccount.annotations }} + annotations: + {{- with .Values.serviceAccount.annotations }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- end }} +{{- end -}} diff --git a/.helm/templates/streamclass-synapse.yaml b/.helm/templates/streamclass-synapse.yaml new file mode 100644 index 0000000..cd0df9c --- /dev/null +++ b/.helm/templates/streamclass-synapse.yaml @@ -0,0 +1,20 @@ +apiVersion: streaming.sneaksanddata.com/v1beta1 +kind: StreamClass +metadata: + name: {{ template "app.name" . }}-synapse + namespace: {{ .Release.Namespace }} + labels: + {{- include "streamclass.labels" $ | nindent 4 }} + {{- if .Values.additionalAnnotations }} + annotations: + {{- with .Values.additionalAnnotations }} + {{- toYaml . | nindent 4 }} + {{- end }} + {{- end }} +spec: + apiGroupRef: streaming.sneaksanddata.com + kindRef: MicrosoftSynapseStream + apiVersion: v1beta1 + pluralName: microsoft-synapse-streams + secretRefs: + - connectionStringRef diff --git a/.helm/templates/streaming-job-template.yaml b/.helm/templates/streaming-job-template.yaml new file mode 100644 index 0000000..751f24e --- /dev/null +++ b/.helm/templates/streaming-job-template.yaml @@ -0,0 +1,104 @@ +{{- if and .Values.jobTemplateSettings.create -}} +apiVersion: streaming.sneaksanddata.com/v1 +kind: StreamingJobTemplate +metadata: + name: {{ include "app.name" $ }} +spec: + metadata: + labels: + {{- include "job.labels" $ | nindent 6 }} + {{- with .Values.jobTemplateSettings.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 6}} + {{- end }} + template: + apiVersion: batch/v1 + kind: job + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + backoffLimit: {{ .Values.jobTemplateSettings.backoffLimit }} + ttlSecondsAfterFinished: 1 + template: + metadata: + labels: + {{- include "app.labels" $ | nindent 12 }} + {{- with .Values.jobTemplateSettings.additionalAnnotations }} + annotations: + {{- toYaml . | nindent 12 }} + {{- end }} + spec: + {{- with .Values.jobTemplateSettings.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + containers: + - name: arcane-stream + image: "{{ include "app.image" . }}" + imagePullPolicy: "{{ .Values.image.pullPolicy }}" + {{- with .Values.jobTemplateSettings.securityContext }} + securityContext: + {{- toYaml . | nindent 16 }} + {{- end }} + env: + - name: ASPNETCORE_ENVIRONMENT + value: {{ .Values.environment }} + - name: APPLICATION_VERSION + value: "{{ (default (printf "v%s" .Chart.AppVersion) .Values.image.tag) }}" + {{- with .Values.jobTemplateSettings.extraEnv }} + {{- toYaml . | nindent 16 }} + {{- end }} + {{- if .Values.jobTemplateSettings.extraEnvFrom }} + envFrom: + {{- with .Values.jobTemplateSettings.extraEnvFrom }} + {{- toYaml . | nindent 16 }} + {{- end }} + {{- end }} + {{- if .Values.jobTemplateSettings.extraVolumeMounts }} + volumeMounts: + {{- with .Values.jobTemplateSettings.extraVolumeMounts }} + {{- toYaml . | nindent 16 }} + {{- end }} + {{- end }} + {{- with .Values.jobTemplateSettings.resources }} + resources: + {{- toYaml . | nindent 16 }} + {{- end }} + restartPolicy: Never + {{- if .Values.jobTemplateSettings.extraVolumes }} + volumes: + {{- with .Values.jobTemplateSettings.extraVolumes }} + {{- toYaml . | nindent 12 }} + {{- end }} + {{- end }} + serviceAccountName: {{ include "app.serviceAccountName" . }} + {{- with .Values.jobTemplateSettings.tolerations }} + tolerations: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with .Values.jobTemplateSettings.affinity }} + affinity: + {{- toYaml . | nindent 12 }} + {{- end }} + podFailurePolicy: + rules: + - action: Ignore + onExitCodes: + operator: In + {{- with .Values.jobTemplateSettings.podFailurePolicySettings.retryOnExitCodes }} + values: + {{- toYaml . | nindent 16 }} + {{- end }} + - action: FailJob + onExitCodes: + operator: In + {{- with .Values.jobTemplateSettings.podFailurePolicySettings.failOnExitCodes }} + values: + {{- toYaml . | nindent 16 }} + {{- end }} + - action: Ignore + onPodConditions: + - type: DisruptionTarget +{{- end }} diff --git a/.helm/values.yaml b/.helm/values.yaml new file mode 100644 index 0000000..a19b7b9 --- /dev/null +++ b/.helm/values.yaml @@ -0,0 +1,175 @@ +environment: "Development" +replicaCount: 1 + +image: + # Repository to pull the image from + repository: "ghcr.io/sneaksanddata/arcane-stream-microsoft-synapse-link" + + # Tag to pull (defaults to the chart appVersion) + tag: "" + + # Image pull policy + pullPolicy: "IfNotPresent" + +# Image pull secrets for private repositories +imagePullSecrets: [ ] + +# Override the application name +nameOverride: "" + +# Fullname override +fullnameOverride: "" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: { } + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +# CRD configuration +customResourceDefinitions: + + # Set to true to create CRDs for this operator + # Otherwise, the operator will expect the CRDs to be pre-installed + create: true + +rbac: + # Specifies whether RBAC resources should be created + clusterRole: + + # Allows the service account to list and view custom resources + synapseStreamViewer: + additionalLabels: { } + additionalAnnotations: { } + create: true + nameOverride: "" + + # Allows the service account to update custom resources + synapseStreamEditor: + additionalLabels: { } + additionalAnnotations: { } + create: true + nameOverride: "" + + # Allows the streaming plugin to report schema mismatch status for the Operator + jobEditor: + additionalLabels: { } + additionalAnnotations: { } + create: true + nameOverride: "" + + # This parameter determines whether role binding resources need to be created. + # If you have any roles in your configuration set to 'true', then this parameter for creating role binding resources + # should also be set to 'true'. + clusterRoleBindings: + additionalLabels: { } + additionalAnnotations: { } + create: true + +# Additional labels for the stream classes +additionalLabels: { } +# Example: +# +# app.my-company.com/name: arcane-stream +# app.my-company.com/component: streaming + +# Additional labels for the stream classes +additionalAnnotations: { } +# Example: +# +# app.my-company.com/name: arcane-stream +# app.my-company.com/source: restapi +# app.my-company.com/application: arcane + +jobTemplateSettings: + create: true + + # Job backoff limit + backoffLimit: 3 + + # Additional labels for the streaming jobs and pods + additionalLabels: { } + # Example: + # + # app.my-company.com/name: arcane-stream + # app.my-company.com/component: streaming + + # Additional labels for the streaming jobs and pods + additionalAnnotations: { } + # Example: + # + # app.my-company.com/name: arcane-stream + # app.my-company.com/source: restapi + # app.my-company.com/application: arcane + + # Extra environment variables to set in the the streaming job + extraEnv: [ ] + # Example: + # + # - name: ASPNETCORE_ENVIRONMENT + # value: production + + # Extra environment variables referencing a ConfigMap or Secret + extraEnvFrom: [ ] + # Example: + # + # envFrom: + # - configMapRef: + # name: custom-api-access-token + + # Extra volumes to add to the streaming job + extraVolumes: [ ] + # Example: + # + # - name: data-volume + # emptyDir: + # sizeLimit: 500Mi + + # Extra volumes to add to the streaming job + extraVolumeMounts: [ ] + # Example: + # + # - mountPath: /data + # name: data-volume + + # Resources constraints. By default, the operator does not specify any constraints to allow for easier deployment + resources: { } + # Example: + # + # requests: + # cpu: 1 + # memory: 1Gi + # limits: + # cpu: 1 + # memory: 1Gi + + # Node labels for pod assignment + tolerations: [ ] + + # Node labels for pod assignment + affinity: { } + + # Security context settings for the container + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: + - ALL + readOnlyRootFilesystem: true + runAsNonRoot: true + runAsUser: 1000 + seccompProfile: + type: RuntimeDefault + + + # Allows to specify a custom settings for pod failure policy + podFailurePolicySettings: + + # Specifies the list of exit codes that should trigger a retry without incrementing the retry count + retryOnExitCodes: + # The stream container completes with this exit code when a retryable error occurs + # Like rate by the server limiting, transient connection errors etc. + - 2 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/create-polaris-catalog.sh b/create-polaris-catalog.sh new file mode 100644 index 0000000..5774e96 --- /dev/null +++ b/create-polaris-catalog.sh @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# https://github.com/apache/polaris/blob/45602456edb139c41553fb37716c151db597608b/getting-started/trino/create-polaris-catalog.sh + +PRINCIPAL_TOKEN="principal:root;realm:default-realm" + +# Use s3 filesystem by default +curl -i -X POST -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/catalogs \ + -d '{ + "catalog": { + "name": "polaris", + "type": "INTERNAL", + "readOnly": false, + "properties": { + "default-base-location": "s3://tmp/polaris/" + }, + "storageConfigInfo": { + "storageType": "S3", + "allowedLocations": [ + "s3://tmp/polaris/", + "s3://lakehouse/polaris/" + ], + "roleArn": "arn:aws:iam::000000000000:role/polaris-access-role" + } + } + }' +# create namespace +curl -i -X POST -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/catalog/v1/polaris/namespaces \ + -d '{ + "namespace": [ + "test" + ], + "properties": {} + }' + +# create principal role +curl -i -X POST -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/principal-roles \ + -d '{ + "principalRole": { + "name": "admin" + } + }' + +# create catalog role +curl -i -X POST -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/catalogs/polaris/catalog-roles \ + -d '{ + "catalogRole": { + "name": "admin" + } + }' + +# assign principal role +curl -i -X PUT -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/principals/root/principal-roles \ + -d '{ + "principalRole": { + "name": "admin" + } + }' + +# assign principal role to catalog role +curl -i -X PUT -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/principal-roles/admin/catalog-roles/polaris \ + -d '{ + "catalogRole": { + "name": "admin" + } + }' + +# add grant to catalog role +curl -i -X PUT -H "Authorization: Bearer $PRINCIPAL_TOKEN" -H 'Accept: application/json' -H 'Content-Type: application/json' \ + http://10.1.0.4:8181/api/management/v1/catalogs/polaris/catalog-roles/admin/grants \ + -d '{ + "grant": { + "type": "catalog", + "privilege": "CATALOG_MANAGE_CONTENT" + } + }' \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..b096620 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,122 @@ +version: '3.3' +networks: + mesh: + driver: bridge + ipam: + config: + - subnet: 10.1.0.0/16 + gateway: 10.1.0.1 + +services: + minio: + container_name: minio + hostname: minio-e2e + image: quay.io/minio/minio + restart: always + networks: + mesh: + ipv4_address: 10.1.0.2 + command: + - server + - /data + - "--console-address" + - ":9001" + ports: + - "9000:9000" + - "9001:9001" + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9001" ] + interval: 30s + timeout: 10s + retries: 5 + prepare_buckets: + container_name: minio-setup + image: quay.io/minio/minio + networks: + mesh: + ipv4_address: 10.1.0.3 + depends_on: + minio: + condition: service_healthy + entrypoint: + - "/bin/sh" + - "-c" + - | + mc alias set e2e "http://10.1.0.2:9000" minioadmin minioadmin + mc admin info e2e + mc mb e2e/tmp && mc mb e2e/lakehouse + # https://github.com/databricks/docker-spark-iceberg/blob/main/docker-compose.yml + # note there is no publicly available image for Polaris yet + # follow guidelines from https://github.com/apache/polaris?tab=readme-ov-file#more-build-and-run-options to build one + polaris: + container_name: polaris + image: ghcr.io/sneaksanddata/apache-polaris:latest + restart: always + depends_on: + prepare_buckets: + condition: service_completed_successfully + networks: + mesh: + ipv4_address: 10.1.0.4 + ports: + - "8181:8181" + - "8182:8182" + environment: + AWS_REGION: us-east-1 + AWS_ENDPOINT_URL_S3: http://10.1.0.2:9000 + AWS_ENDPOINT_URL_STS: http://10.1.0.2:9000 + # add aws keys as dropwizard config + JAVA_OPTS: -Ddw.awsAccessKey=minioadmin -Ddw.awsSecretKey=minioadmin + + healthcheck: + test: ["CMD", "curl", "http://localhost:8182/healthcheck"] + interval: 10s + timeout: 10s + retries: 5 + create-polaris-catalog: + image: curlimages/curl + networks: + mesh: + ipv4_address: 10.1.0.5 + depends_on: + polaris: + condition: service_healthy + volumes: + - ./create-polaris-catalog.sh:/create-polaris-catalog.sh + command: ["/bin/sh", "/create-polaris-catalog.sh"] + azurite: + image: mcr.microsoft.com/azure-storage/azurite + restart: always + networks: + mesh: + ipv4_address: 10.1.0.6 + command: + - "azurite-blob" + - "--blobHost" + - "10.1.0.6" + - "--blobPort" + - "10001" + ports: + - "10001:10001" + create-cdm-container: + image: python:3.11-slim-bookworm + depends_on: + - azurite + networks: + mesh: + ipv4_address: 10.1.0.7 + volumes: + - ./populate-cdm-container.py:/populate-cdm-container.py + command: [ "/bin/sh", "-c", "pip install azure-storage-blob requests && python /populate-cdm-container.py" ] + trino: + depends_on: + polaris: + condition: service_healthy + networks: + mesh: + ipv4_address: 10.1.0.9 + ports: + - "8080:8080" + image: "trinodb/trino:455" + volumes: + - ./integration-tests.properties:/etc/trino/catalog/iceberg.properties diff --git a/integration-tests.env b/integration-tests.env new file mode 100644 index 0000000..6eb103c --- /dev/null +++ b/integration-tests.env @@ -0,0 +1,20 @@ +STREAMCONTEXT__BACKFILL=false +STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-cdm-change-feed-large-job" }, "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/", "changeCaptureIntervalSeconds": 5, "entityName": "DimensionAttributeLevelValue", "groupingIntervalSeconds": 3, "groupsPerFile": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-cdm-change-feed-standard-job" }, "lookBackInterval": 21600, "rowsPerGroup": 10000, "schemaUpdateIntervalSeconds": 10, "sinkLocation": "integration_tests", "catalogSettings": { "namespace": "test", "warehouse": "polaris", "catalogUri": "http://localhost:8181/api/catalog" }, "name": "dimensionattributelevelvalue", "stagingLocation": "s3://tmp/polaris/test", "partitionExpression": ""}' +STREAMCONTEXT__STREAM_ID=test +STREAMCONTEXT__STREAM_KIND=CdmChangeFeed +APPLICATION_VERSION=0.0.1 +ARCANE_DATADOG_ENDPOINT=tcp-intake.logs.datadoghq.eu:443 +ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID=minioadmin +ARCANE_FRAMEWORK__S3_CATALOG_SECRET_ACCESS_KEY=minioadmin +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_INIT_TOKEN="principal:root;realm:default-realm" +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_CLIENT_URI=http://localhost:8181/api/catalog/v1/oauth/tokens +ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SCOPE=PRINCIPAL_ROLE:ALL +ARCANE_FRAMEWORK__S3_CATALOG_ENDPOINT=http://localhost:9000 +AWS_REGION=us-east-1 +ARCANE_FRAMEWORK__CDM_TEST_TABLE=dimensionattributelevelvalue +ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== +ARCANE_FRAMEWORK__STORAGE_ACCOUNT=devstoreaccount1 +ARCANE_FRAMEWORK__STORAGE_CONTAINER=cdm-e2e +ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10001/devstoreaccount1 +ARCANE_FRAMEWORK__MERGE_SERVICE_CONNECTION_URI=jdbc:trino://localhost:8080/iceberg/test?user=test + diff --git a/integration-tests.properties b/integration-tests.properties new file mode 100644 index 0000000..b7bd602 --- /dev/null +++ b/integration-tests.properties @@ -0,0 +1,18 @@ +# Configuration for the Trino Iceberg connector + +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://10.1.0.4:8181/api/catalog +iceberg.rest-catalog.security=OAUTH2 +iceberg.rest-catalog.oauth2.token=principal:root;realm:default-realm +iceberg.rest-catalog.warehouse=polaris + + +fs.native-s3.enabled=true +s3.region=us-east-1 +s3.path-style-access=true +s3.endpoint=http://10.1.0.2:9000 +s3.sts.endpoint=http://10.1.0.2:9000 +s3.aws-access-key=minioadmin +s3.aws-secret-key=minioadmin + diff --git a/populate-cdm-container.py b/populate-cdm-container.py new file mode 100644 index 0000000..9b1b422 --- /dev/null +++ b/populate-cdm-container.py @@ -0,0 +1,666 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from azure.storage.blob import BlobServiceClient +from datetime import datetime, timedelta + +import os + +CONTENT = """50bff458-d47a-4924-804b-31c0a83108e6,"1/1/2020 0:00:00 PM","1/1/2020 0:00:00 PM",0,1111000000,1111000010,"F1234567",1,,"2020-01-01T00:15:00.0000000Z","acc1",111111110,"2020-01-01T00:15:00.0000000Z","acc1",0,"dat",1,1111000001,2111000001,1111000001,21111,2111000001,"2020-01-01T00:15:00.0000000+00:00","2020-01-01T00:15:00.0000000Z", +5b4bc74e-2132-4d8e-8572-48ce4260f182,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000001,1111000011,"F1234568",1,,"2020-01-01T00:16:00.0000000Z","acc2",111111111,"2020-01-01T00:16:00.0000000Z","acc2",0,"dat",1,1111000002,2111000002,1111000001,21111,2111000001,"2020-01-01T00:16:00.0000000+00:00","2020-01-01T00:16:00.0000000Z", +aae2094d-cd17-42b4-891e-3b268e2b6713,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000002,1111000012,"F1234569",1,,"2020-01-01T00:17:00.0000000Z","acc2",111111112,"2020-01-01T00:17:00.0000000Z","acc2",0,"dat",1,1111000003,2111000003,1111000001,21111,2111000001,"2020-01-01T00:17:00.0000000+00:00","2020-01-01T00:17:00.0000000Z", +9633be9a-c485-4afa-8bb7-4ba380eaa206,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000003,1111000013,"F1234578",1,,"2020-01-01T00:18:00.0000000Z","acc1",111111113,"2020-01-01T00:18:00.0000000Z","acc1",0,"dat",1,1111000004,2111000004,1111000001,21111,2111000001,"2020-01-01T00:18:00.0000000+00:00","2020-01-01T00:18:00.0000000Z", +b62c7b67-b8f8-4635-8cef-1c23591d5c4c,"1/1/2020 0:00:01 PM","1/1/2020 0:00:01 PM",0,1111000004,1111000014,"F1234511",1,,"2020-01-01T00:19:00.0000000Z","acc2",111111114,"2020-01-01T00:19:00.0000000Z","acc2",0,"dat",1,1111000005,2111000005,1111000001,21111,2111000001,"2020-01-01T00:19:00.0000000+00:00","2020-01-01T00:19:00.0000000Z", +""" + +MODEL_JSON = """{ + "name": "cdm", + "description": "cdm", + "version": "1.0", + "entities": [ + { + "$type": "LocalEntity", + "name": "currency", + "description": "currency", + "annotations": [ + { + "name": "Athena:PartitionGranularity", + "value": "Year" + }, + { + "name": "Athena:InitialSyncState", + "value": "Completed" + }, + { + "name": "Athena:InitialSyncDataCompletedTime", + "value": "1/1/2020 0:00:00 PM" + } + ], + "attributes": [ + { + "name": "Id", + "dataType": "guid", + "maxLength": -1 + }, + { + "name": "SinkCreatedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "SinkModifiedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "iseuro", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypeassetdep_jp", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypeprice", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypepurch", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "roundofftypesales", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "ltmroundofftypelineamount", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysdatastatecode", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "currencycode", + "dataType": "string", + "maxLength": 3, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 3 + } + ] + } + ] + }, + { + "name": "currencycodeiso", + "dataType": "string", + "maxLength": 3, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 3 + } + ] + } + ] + }, + { + "name": "roundingprecision", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffassetdep_jp", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffprice", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffpurch", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "roundoffsales", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "symbol", + "dataType": "string", + "maxLength": 5, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 5 + } + ] + } + ] + }, + { + "name": "txt", + "dataType": "string", + "maxLength": 120, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 120 + } + ] + } + ] + }, + { + "name": "exchratemaxvariationpercent_mx", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "decimalscount_mx", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "ltmroundofflineamount", + "dataType": "decimal", + "maxLength": -1, + "cdm:traits": [ + { + "traitReference": "is.dataFormat.numeric.shaped", + "arguments": [ + { + "name": "precision", + "value": 38 + }, + { + "name": "scale", + "value": 6 + } + ] + } + ] + }, + { + "name": "modifieddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "modifiedby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "modifiedtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "createdby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "createdtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "recversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "partition", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysrowversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "recid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "tableid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "versionnumber", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createdon", + "dataType": "dateTimeOffset", + "maxLength": -1 + }, + { + "name": "modifiedon", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "IsDelete", + "dataType": "boolean", + "maxLength": -1 + } + ], + "partitions": [] + }, + { + "$type": "LocalEntity", + "name": "dimensionattributelevelvalue", + "description": "dimensionattributelevelvalue", + "annotations": [ + { + "name": "Athena:PartitionGranularity", + "value": "Year" + }, + { + "name": "Athena:InitialSyncState", + "value": "Completed" + }, + { + "name": "Athena:InitialSyncDataCompletedTime", + "value": "1/1/2020 0:00:00 PM" + } + ], + "attributes": [ + { + "name": "Id", + "dataType": "guid", + "maxLength": -1 + }, + { + "name": "SinkCreatedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "SinkModifiedOn", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "sysdatastatecode", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dimensionattributevalue", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dimensionattributevaluegroup", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "displayvalue", + "dataType": "string", + "maxLength": 30, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 30 + } + ] + } + ] + }, + { + "name": "ordinal", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "backingrecorddataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "modifieddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "modifiedby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "modifiedtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createddatetime", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "createdby", + "dataType": "string", + "maxLength": 20, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 20 + } + ] + } + ] + }, + { + "name": "createdtransactionid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "dataareaid", + "dataType": "string", + "maxLength": 4, + "cdm:traits": [ + { + "traitReference": "is.constrained", + "arguments": [ + { + "name": "maximumLength", + "value": 4 + } + ] + } + ] + }, + { + "name": "recversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "partition", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "sysrowversion", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "recid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "tableid", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "versionnumber", + "dataType": "int64", + "maxLength": -1 + }, + { + "name": "createdon", + "dataType": "dateTimeOffset", + "maxLength": -1 + }, + { + "name": "modifiedon", + "dataType": "dateTime", + "maxLength": -1 + }, + { + "name": "IsDelete", + "dataType": "boolean", + "maxLength": -1 + } + ], + "partitions": [] + } + ] + }""" + +AZURITE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://10.1.0.6:10001/devstoreaccount1' +CONTAINER = "cdm-e2e" +# Get the current date and time +now = datetime.utcnow() + +# Subtract 6 hours +start_time = now - timedelta(hours=6) + +# Generate formatted strings for each hour +FOLDERS = [(start_time + timedelta(hours=i)).strftime("%Y-%m-%dT%H.%M.%SZ") for i in range(8)] + +def upload_blob_file(blob_service_client: BlobServiceClient, container_name: str, blob_name: str, content: str): + blob_service_client.get_container_client(container=container_name).upload_blob(name=blob_name, data=content.encode('utf-8'), overwrite=True) + +def create_container(): + # Create a container for Azurite for the first run + blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING) + try: + blob_service_client.create_container(CONTAINER) + except Exception as e: + print(e) + +def create_blobs(): + blob_service_client = BlobServiceClient.from_connection_string(AZURITE_CONNECTION_STRING) + for folder in FOLDERS: + upload_blob_file(blob_service_client, CONTAINER, f"{folder}/dimensionattributelevelvalue/2020.csv", CONTENT) + + upload_blob_file(blob_service_client, CONTAINER, "model.json", MODEL_JSON) + +create_container() +create_blobs() diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..02d3789 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/src/main/scala/main.scala b/src/main/scala/main.scala new file mode 100644 index 0000000..2ae1690 --- /dev/null +++ b/src/main/scala/main.scala @@ -0,0 +1,75 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link + +import models.app.{AzureConnectionSettings, MicrosoftSynapseLinkStreamContext} +import services.StreamGraphBuilderFactory +import services.data_providers.microsoft_synapse_link.{CdmDataProvider, CdmSchemaProvider} +import services.clients.JdbcConsumer +import services.streaming.consumers.IcebergSynapseConsumer +import services.streaming.processors.{ArchivationProcessor, CdmGroupingProcessor, MergeBatchProcessor, TypeAlignmentService} + +import com.azure.storage.common.StorageSharedKeyCredential +import com.sneaksanddata.arcane.framework.models.DataRow +import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, VersionedDataGraphBuilderSettings} +import com.sneaksanddata.arcane.framework.services.app.base.{StreamLifetimeService, StreamRunnerService} +import com.sneaksanddata.arcane.framework.services.app.logging.base.Enricher +import com.sneaksanddata.arcane.framework.services.app.{PosixStreamLifetimeService, StreamRunnerServiceImpl} +import com.sneaksanddata.arcane.framework.services.lakehouse.IcebergS3CatalogWriter +import com.sneaksanddata.arcane.framework.services.storage.models.azure.AzureBlobStorageReader +import com.sneaksanddata.arcane.framework.services.streaming.base.{BatchProcessor, StreamGraphBuilder} +import com.sneaksanddata.arcane.framework.services.streaming.consumers.IcebergBackfillConsumer +import com.sneaksanddata.arcane.framework.services.streaming.processors.{BackfillGroupingProcessor, MergeProcessor} +import org.slf4j.MDC +import zio.* +import zio.logging.LogFormat +import zio.logging.backend.SLF4J + + +object main extends ZIOAppDefault { + + private val loggingProprieties = Enricher("Application", "Arcane.Stream.Scala") + ++ Enricher("App", "Arcane.Stream.Scala") + ++ Enricher.fromEnvironment("APPLICATION_VERSION", "0.0.0") + + override val bootstrap: ZLayer[Any, Nothing, Unit] = SLF4J.slf4j( + LogFormat.make{ (builder, _, _, _, line, _, _, _, _) => + loggingProprieties.enrichLoggerWith(builder.appendKeyValue) + loggingProprieties.enrichLoggerWith(MDC.put) + builder.appendText(line()) + } + ) + + private val appLayer = for + _ <- ZIO.log("Application starting") + context <- ZIO.service[StreamContext].debug("initialized stream context") + streamRunner <- ZIO.service[StreamRunnerService].debug("initialized stream runner") + _ <- streamRunner.run + yield () + + val storageExplorerLayer: ZLayer[AzureConnectionSettings, Nothing, AzureBlobStorageReader] = ZLayer { + for { + connectionOptions <- ZIO.service[AzureConnectionSettings] + credentials = StorageSharedKeyCredential(connectionOptions.account, connectionOptions.accessKey) + } yield AzureBlobStorageReader(connectionOptions.account, connectionOptions.endpoint, credentials) + } + + @main + def run: ZIO[Any, Throwable, Unit] = + appLayer.provide( + storageExplorerLayer, + CdmDataProvider.layer, + CdmSchemaProvider.layer, + MicrosoftSynapseLinkStreamContext.layer, + PosixStreamLifetimeService.layer, + StreamRunnerServiceImpl.layer, + StreamGraphBuilderFactory.layer, + IcebergS3CatalogWriter.layer, + IcebergSynapseConsumer.layer, + MergeBatchProcessor.layer, + JdbcConsumer.layer, + CdmGroupingProcessor.layer, + ArchivationProcessor.layer, + TypeAlignmentService.layer) + .orDie +} + diff --git a/src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala b/src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala new file mode 100644 index 0000000..19e40fd --- /dev/null +++ b/src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala @@ -0,0 +1,122 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package models.app + +import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, SinkSettings, VersionedDataGraphBuilderSettings} +import com.sneaksanddata.arcane.framework.services.cdm.CdmTableSettings +import com.sneaksanddata.arcane.framework.services.consumers.JdbcConsumerOptions +import com.sneaksanddata.arcane.framework.services.lakehouse.{IcebergCatalogCredential, S3CatalogFileIO} +import com.sneaksanddata.arcane.framework.services.lakehouse.base.IcebergCatalogSettings +import zio.ZLayer +import upickle.default.* + +import java.time.Duration + +trait AzureConnectionSettings: + val endpoint: String + val container: String + val account: String + val accessKey: String + +/** + * The configuration of Iceberg sink. + */ +case class CatalogSettings(namespace: String, warehouse: String, catalogUri: String) derives ReadWriter + +/** + * The specification for the stream. + * + * @param name The name of the CDM table + * @param baseLocation The entity base location + * @param rowsPerGroup The number of rows per group in the staging table + * @param groupingIntervalSeconds The grouping interval in seconds + * @param groupsPerFile The number of groups per file + * @param lookBackInterval The look back interval in seconds + * @param changeCaptureIntervalSeconds The change capture interval in seconds + * @param partitionExpression Partition expression for partitioning the data in the staging table (optional) + */ +case class StreamSpec(name: String, + baseLocation: String, + + // Grouping settings + rowsPerGroup: Int, + groupingIntervalSeconds: Int, + groupsPerFile: Int, + lookBackInterval: Int, + + // Timeouts + changeCaptureIntervalSeconds: Int, + + // Iceberg settings + catalog: CatalogSettings, + + stagingLocation: Option[String], + sinkLocation: String, + partitionExpression: Option[String]) + derives ReadWriter + + +/** + * The context for the SQL Server Change Tracking stream. + * @param spec The stream specification + */ +case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec) extends StreamContext + with GroupingSettings + with IcebergCatalogSettings + with JdbcConsumerOptions + with VersionedDataGraphBuilderSettings + with AzureConnectionSettings + with SinkSettings: + + override val rowsPerGroup: Int = spec.rowsPerGroup + override val lookBackInterval: Duration = Duration.ofSeconds(spec.lookBackInterval) + override val changeCaptureInterval: Duration = Duration.ofSeconds(spec.changeCaptureIntervalSeconds) + override val groupingInterval: Duration = Duration.ofSeconds(spec.groupingIntervalSeconds) + + override val namespace: String = spec.catalog.namespace + override val warehouse: String = spec.catalog.warehouse + override val catalogUri: String = spec.catalog.catalogUri + + override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties + override val s3CatalogFileIO: S3CatalogFileIO = S3CatalogFileIO + + override val stagingLocation: Option[String] = spec.stagingLocation + + override val connectionUrl: String = sys.env("ARCANE_FRAMEWORK__MERGE_SERVICE_CONNECTION_URI") + + /** + * The target table to write the data. + */ + override val sinkLocation: String = spec.sinkLocation + + override val endpoint: String = sys.env("ARCANE_FRAMEWORK__STORAGE_ENDPOINT") + override val container: String = sys.env("ARCANE_FRAMEWORK__STORAGE_CONTAINER") + override val account: String = sys.env("ARCANE_FRAMEWORK__STORAGE_ACCOUNT") + override val accessKey: String = sys.env("ARCANE_FRAMEWORK__STORAGE_ACCESS_KEY") + + +given Conversion[MicrosoftSynapseLinkStreamContext, CdmTableSettings] with + def apply(context: MicrosoftSynapseLinkStreamContext): CdmTableSettings = CdmTableSettings(context.spec.name, context.spec.baseLocation) + +object MicrosoftSynapseLinkStreamContext { + type Environment = StreamContext + & CdmTableSettings + & GroupingSettings + & VersionedDataGraphBuilderSettings + & IcebergCatalogSettings + & JdbcConsumerOptions + & SinkSettings + & AzureConnectionSettings + + /** + * The ZLayer that creates the VersionedDataGraphBuilder. + */ + val layer: ZLayer[Any, Throwable, Environment] = + sys.env.get("STREAMCONTEXT__SPEC") map { raw => + val spec = read[StreamSpec](raw) + val context = MicrosoftSynapseLinkStreamContext(spec) + ZLayer.succeed(context) ++ ZLayer.succeed[CdmTableSettings](context) + } getOrElse { + ZLayer.fail(new Exception("The stream context is not specified.")) + } +} diff --git a/src/main/scala/services/StreamGraphBuilderFactory.scala b/src/main/scala/services/StreamGraphBuilderFactory.scala new file mode 100644 index 0000000..f249910 --- /dev/null +++ b/src/main/scala/services/StreamGraphBuilderFactory.scala @@ -0,0 +1,29 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services + +import services.graph_builder.{BackfillDataGraphBuilder, VersionedDataGraphBuilder} + +import com.sneaksanddata.arcane.framework.models.DataRow +import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.services.streaming.base.StreamGraphBuilder +import zio.{ZIO, ZLayer} + +import java.time.OffsetDateTime + +/** + * Provides a layer that injects a stream graph builder resolved based on the stream context at runtime. + */ +object StreamGraphBuilderFactory: + + private type Environment = StreamContext + & VersionedDataGraphBuilder.Environment + + val layer: ZLayer[Environment, Nothing, StreamGraphBuilder] = ZLayer.fromZIO(getGraphBuilder) + + private def getGraphBuilder = + for + context <- ZIO.service[StreamContext] + _ <- ZIO.log("Start the graph builder type resolution") + builder <- VersionedDataGraphBuilder.layer + yield builder + diff --git a/src/main/scala/services/clients/JdbcConsumer.scala b/src/main/scala/services/clients/JdbcConsumer.scala new file mode 100644 index 0000000..88f307a --- /dev/null +++ b/src/main/scala/services/clients/JdbcConsumer.scala @@ -0,0 +1,90 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.clients + +import com.sneaksanddata.arcane.framework.services.consumers.{JdbcConsumerOptions, StagedVersionedBatch} +import services.clients.{BatchArchivationResult, JdbcConsumer} + +import org.slf4j.{Logger, LoggerFactory} +import zio.{ZIO, ZLayer} + +import java.sql.{Connection, DriverManager, ResultSet} +import scala.concurrent.Future +import scala.util.Try + +/** + * The result of applying a batch. + */ +type BatchApplicationResult = Boolean + +/** + * The result of applying a batch. + */ +class BatchArchivationResult + +/** + * A consumer that consumes batches from a JDBC source. + * + * @param options The options for the consumer. + */ +class JdbcConsumer[Batch <: StagedVersionedBatch](val options: JdbcConsumerOptions) extends AutoCloseable: + require(options.isValid, "Invalid JDBC url provided for the consumer") + + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + private lazy val sqlConnection: Connection = DriverManager.getConnection(options.connectionUrl) + + def getPartitionValues(batchName: String, partitionFields: List[String]): Future[Map[String, List[String]]] = + Future.sequence(partitionFields + .map(partitionField => + val query = s"SELECT DISTINCT $partitionField FROM $batchName" + Future(sqlConnection.prepareStatement(query).executeQuery()) + .map(collectPartitionColumn(_, partitionField)) + .map(values => partitionField -> values.toList) + )).map(_.toMap) + + + def applyBatch(batch: Batch): Future[BatchApplicationResult] = + Future{ + logger.debug(s"Executing batch query: ${batch.batchQuery.query}") + sqlConnection.prepareStatement(batch.batchQuery.query).execute() + } + + def archiveBatch(batch: Batch): Future[BatchArchivationResult] = + Future(sqlConnection.prepareStatement(batch.archiveExpr).execute()) + .flatMap(_ => Future(sqlConnection.prepareStatement(s"DROP TABLE ${batch.name}").execute())) + .map(_ => new BatchArchivationResult) + + def close(): Unit = sqlConnection.close() + + private def collectPartitionColumn(resultSet: ResultSet, columnName: String): Seq[String] = + // do not fail on closed result sets + if resultSet.isClosed then + Seq.empty + else + val current = resultSet.getString(columnName) + if resultSet.next() then + collectPartitionColumn(resultSet, columnName) :+ current + else + resultSet.close() + Seq(current) + + +object JdbcConsumer: + /** + * Factory method to create JdbcConsumer. + * @param options The options for the consumer. + * @return The initialized JdbcConsumer instance + */ + def apply[Batch <: StagedVersionedBatch](options: JdbcConsumerOptions): JdbcConsumer[Batch] = + new JdbcConsumer[Batch](options) + + /** + * The ZLayer that creates the JdbcConsumer. + */ + val layer: ZLayer[JdbcConsumerOptions, Nothing, JdbcConsumer[StagedVersionedBatch]] = + ZLayer.scoped { + ZIO.fromAutoCloseable { + for connectionOptions <- ZIO.service[JdbcConsumerOptions] yield JdbcConsumer(connectionOptions) + } + } diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmDataProvider.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmDataProvider.scala new file mode 100644 index 0000000..f7f6bf2 --- /dev/null +++ b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmDataProvider.scala @@ -0,0 +1,54 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.data_providers.microsoft_synapse_link + +import services.streaming.base.VersionedDataProvider +import com.sneaksanddata.arcane.microsoft_synapse_link.models.app.AzureConnectionSettings +import com.sneaksanddata.arcane.framework.models.DataRow +import com.sneaksanddata.arcane.framework.services.cdm.{CdmTable, CdmTableSettings} +import com.sneaksanddata.arcane.framework.services.mssql.MsSqlConnection.BackfillBatch +import com.sneaksanddata.arcane.framework.services.storage.models.azure.AzureBlobStorageReader +import com.sneaksanddata.arcane.framework.services.streaming.base.BackfillDataProvider +import zio.{Task, ZIO, ZLayer} + +import java.time.{Duration, OffsetDateTime, ZoneOffset} +import scala.concurrent.Future +/** + * A data provider that reads the changes from the Microsoft SQL Server. + * @param cdmTable The CDM table representation. + */ +class CdmDataProvider(cdmTable: CdmTable) extends VersionedDataProvider[OffsetDateTime, LazyList[DataRow]] with BackfillDataProvider: + + override def extractVersion(dataBatch: LazyList[DataRow]): Option[OffsetDateTime] = Some(OffsetDateTime.now()) // TODO: this should return the version from the last data row + + override def requestBackfill: Task[BackfillBatch] = ??? + + override def requestChanges(previousVersion: Option[OffsetDateTime], lookBackInterval: Duration): Task[LazyList[DataRow]] = + val time = previousVersion.getOrElse(OffsetDateTime.now().minusHours(12)) + ZIO.fromFuture(_ => cdmTable.snapshot(Some(time))) + +/** + * The companion object for the MsSqlDataProvider class. + */ +object CdmDataProvider: + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + + type Environment = AzureConnectionSettings + & CdmTableSettings + & AzureBlobStorageReader + & CdmSchemaProvider + + /** + * The ZLayer that creates the CdmDataProvider. + */ + val layer: ZLayer[Environment, Throwable, CdmDataProvider] = + ZLayer { + for { + _ <- ZIO.log("Creating the CDM data provider") + connectionSettings <- ZIO.service[AzureConnectionSettings] + tableSettings <- ZIO.service[CdmTableSettings] + reader <- ZIO.service[AzureBlobStorageReader] + schemaProvider <- ZIO.service[CdmSchemaProvider] + l <- ZIO.fromFuture(_ => schemaProvider.getEntity) + cdmTable = CdmTable(tableSettings, l, reader) + } yield CdmDataProvider(cdmTable) + } diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala new file mode 100644 index 0000000..fc9e089 --- /dev/null +++ b/src/main/scala/services/data_providers/microsoft_synapse_link/CdmSchemaProvider.scala @@ -0,0 +1,42 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.data_providers.microsoft_synapse_link + +import com.sneaksanddata.arcane.framework.models.ArcaneSchema +import com.sneaksanddata.arcane.framework.models.cdm.{SimpleCdmEntity, SimpleCdmModel, given_Conversion_SimpleCdmEntity_ArcaneSchema} +import com.sneaksanddata.arcane.framework.services.base.SchemaProvider +import com.sneaksanddata.arcane.framework.services.cdm.CdmTableSettings +import com.sneaksanddata.arcane.framework.services.mssql.given_CanAdd_ArcaneSchema +import com.sneaksanddata.arcane.framework.services.storage.models.azure.AzureBlobStorageReader + +import zio.{ZIO, ZLayer} + +import scala.concurrent.Future + +class CdmSchemaProvider(azureBlobStorageReader: AzureBlobStorageReader, tableLocation: String, tableName: String) + extends SchemaProvider[ArcaneSchema]: + + implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global + + override def getSchema: Future[SchemaType] = getEntity.flatMap(toArcaneSchema) + + def getEntity: Future[SimpleCdmEntity] = + SimpleCdmModel(tableLocation, azureBlobStorageReader).flatMap(_.entities.find(_.name == tableName) match + case None => Future.failed(new Exception(s"Table $tableName not found in model $tableLocation")) + case Some(entity) => Future.successful(entity) + ) + + override def empty: SchemaType = ArcaneSchema.empty() + + private def toArcaneSchema(simpleCdmModel: SimpleCdmEntity): Future[ArcaneSchema] = Future.successful(simpleCdmModel) + +object CdmSchemaProvider: + + private type Environment = AzureBlobStorageReader & CdmTableSettings + + val layer: ZLayer[Environment, Nothing, CdmSchemaProvider] = + ZLayer { + for + context <- ZIO.service[CdmTableSettings] + settings <- ZIO.service[AzureBlobStorageReader] + yield CdmSchemaProvider(settings, context.rootPath, context.name) + } diff --git a/src/main/scala/services/data_providers/microsoft_synapse_link/StreamRunnerServiceImpl.scala b/src/main/scala/services/data_providers/microsoft_synapse_link/StreamRunnerServiceImpl.scala new file mode 100644 index 0000000..5626149 --- /dev/null +++ b/src/main/scala/services/data_providers/microsoft_synapse_link/StreamRunnerServiceImpl.scala @@ -0,0 +1,40 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.data_providers.microsoft_synapse_link + +import com.sneaksanddata.arcane.framework.services.app.base.{StreamLifetimeService, StreamRunnerService} +import com.sneaksanddata.arcane.framework.services.streaming.base.StreamGraphBuilder + +import zio.{ZIO, ZLayer} + +/** + * A service that can be used to run a stream. + * + * @param builder The stream graph builder. + * @param lifetimeService The stream lifetime service. + */ +private class StreamRunnerServiceCdm(builder: StreamGraphBuilder, lifetimeService: StreamLifetimeService) extends StreamRunnerService: + + /** + * Runs the stream. + * + * @return A ZIO effect that represents the stream. + */ + def run: ZIO[Any, Throwable, Unit] = + lifetimeService.start() + builder.create.run(builder.consume) + +/** + * The companion object for the StreamRunnerServiceImpl class. + */ +object StreamRunnerServiceImpl: + + /** + * The ZLayer for the stream runner service. + */ + val layer: ZLayer[StreamGraphBuilder & StreamLifetimeService, Nothing, StreamRunnerService] = + ZLayer { + for { + builder <- ZIO.service[StreamGraphBuilder] + lifetimeService <- ZIO.service[StreamLifetimeService] + } yield new StreamRunnerServiceCdm(builder, lifetimeService) + } diff --git a/src/main/scala/services/graph_builder/BackfillDataGraphBuilder.scala b/src/main/scala/services/graph_builder/BackfillDataGraphBuilder.scala new file mode 100644 index 0000000..ee1cb89 --- /dev/null +++ b/src/main/scala/services/graph_builder/BackfillDataGraphBuilder.scala @@ -0,0 +1,68 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.graph_builder + +import com.sneaksanddata.arcane.framework.models.DataRow +import com.sneaksanddata.arcane.framework.services.app.base.StreamLifetimeService +import com.sneaksanddata.arcane.framework.services.streaming.base.{BackfillDataProvider, BatchProcessor, StreamGraphBuilder} +import com.sneaksanddata.arcane.framework.services.streaming.consumers.BackfillConsumer + +import org.slf4j.{Logger, LoggerFactory} +import zio.stream.{ZSink, ZStream} +import zio.{Chunk, ZIO} + +class BackfillDataGraphBuilder(backfillDataProvider: BackfillDataProvider, + streamLifetimeService: StreamLifetimeService, + batchProcessor: BatchProcessor[DataRow, Chunk[DataRow]], + batchConsumer: BackfillConsumer) + extends StreamGraphBuilder: + + + private val logger: Logger = LoggerFactory.getLogger(classOf[BackfillDataGraphBuilder]) + + override type StreamElementType = Chunk[DataRow] + + override def create: ZStream[Any, Throwable, StreamElementType] = + ZStream.fromZIO(backfillDataProvider.requestBackfill) + .takeUntil(_ => streamLifetimeService.cancelled) + .flatMap(batch => ZStream.fromIterable(batch.read)) + .via(batchProcessor.process) + + override def consume: ZSink[Any, Throwable, StreamElementType, Any, Unit] = batchConsumer.consume + +/** + * The companion object for the VersionedDataGraphBuilder class. + */ +object BackfillDataGraphBuilder: + type Environment = BackfillDataProvider + & StreamLifetimeService + & BatchProcessor[DataRow, Chunk[DataRow]] + & BackfillConsumer + + /** + * Creates a new instance of the BackfillDataGraphBuilder class. + * + * @param backfillDataProvider The backfill data provider. + * @param streamLifetimeService The stream lifetime service. + * @param batchProcessor The batch processor. + * @return A new instance of the BackfillDataGraphBuilder class. + */ + def apply(backfillDataProvider: BackfillDataProvider, + streamLifetimeService: StreamLifetimeService, + batchProcessor: BatchProcessor[DataRow, Chunk[DataRow]], + batchConsumer: BackfillConsumer): BackfillDataGraphBuilder = + new BackfillDataGraphBuilder(backfillDataProvider, streamLifetimeService, batchProcessor, batchConsumer) + + /** + * Creates a new instance of the BackfillDataGraphBuilder using services provided by ZIO Environment. + * + * @return A new instance of the BackfillDataGraphBuilder class. + */ + def apply(): ZIO[Environment, Nothing, BackfillDataGraphBuilder] = + for + _ <- ZIO.log("Running in backfill mode") + dp <- ZIO.service[BackfillDataProvider] + ls <- ZIO.service[StreamLifetimeService] + bp <- ZIO.service[BatchProcessor[DataRow, Chunk[DataRow]]] + bc <- ZIO.service[BackfillConsumer] + yield BackfillDataGraphBuilder(dp, ls, bp, bc) + diff --git a/src/main/scala/services/graph_builder/VersionedDataGraphBuilder.scala b/src/main/scala/services/graph_builder/VersionedDataGraphBuilder.scala new file mode 100644 index 0000000..bfdfde0 --- /dev/null +++ b/src/main/scala/services/graph_builder/VersionedDataGraphBuilder.scala @@ -0,0 +1,112 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.graph_builder + +import main.validateEnv +import services.streaming.base.VersionedDataProvider + +import com.sneaksanddata.arcane.framework.models.DataRow +import com.sneaksanddata.arcane.framework.models.settings.VersionedDataGraphBuilderSettings +import com.sneaksanddata.arcane.framework.services.app.base.StreamLifetimeService +import com.sneaksanddata.arcane.framework.services.mssql.MsSqlConnection.DataBatch +import com.sneaksanddata.arcane.framework.services.streaming.base.{BatchProcessor, StreamGraphBuilder} +import com.sneaksanddata.arcane.framework.services.streaming.consumers.StreamingConsumer +import org.slf4j.{Logger, LoggerFactory} +import zio.stream.{ZSink, ZStream} +import zio.{Chunk, Schedule, ZIO} + +import java.time.OffsetDateTime + +/** + * The stream graph builder that reads the changes from the database. + * @param versionedDataGraphBuilderSettings The settings for the stream source. + * @param versionedDataProvider The versioned data provider. + * @param streamLifetimeService The stream lifetime service. + * @param batchProcessor The batch processor. + */ +class VersionedDataGraphBuilder[VersionType, BatchType] + (versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings, + versionedDataProvider: VersionedDataProvider[VersionType, BatchType], + streamLifetimeService: StreamLifetimeService, + batchProcessor: BatchProcessor[BatchType, Chunk[DataRow]], + batchConsumer: StreamingConsumer) + extends StreamGraphBuilder: + + private val logger: Logger = LoggerFactory.getLogger(classOf[VersionedDataGraphBuilder[VersionType, BatchType]]) + override type StreamElementType = Chunk[DataRow] + + /** + * Builds a stream that reads the changes from the database. + * + * @return The stream that reads the changes from the database. + */ + override def create: ZStream[Any, Throwable, StreamElementType] = this.createStream.via(this.batchProcessor.process) + + /** + * Creates a ZStream for the stream graph. + * + * @return ZStream (stream source for the stream graph). + */ + override def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = batchConsumer.consume + + private def createStream = ZStream + .unfoldZIO(versionedDataProvider.firstVersion) { previousVersion => + if streamLifetimeService.cancelled then + ZIO.succeed(None) + else + continueStream(previousVersion) + } + .schedule(Schedule.spaced(versionedDataGraphBuilderSettings.changeCaptureInterval)) + + private def continueStream(previousVersion: Option[VersionType]): ZIO[Any, Throwable, Some[(BatchType, Option[VersionType])]] = + versionedDataProvider.requestChanges(previousVersion, versionedDataGraphBuilderSettings.lookBackInterval) map { versionedBatch => + val latestVersion = versionedDataProvider.extractVersion(versionedBatch) + logger.info(s"Latest version: $latestVersion") + Some(versionedBatch, latestVersion) + } + +/** + * The companion object for the VersionedDataGraphBuilder class. + */ +object VersionedDataGraphBuilder: + + type Environment = VersionedDataProvider[OffsetDateTime, LazyList[DataRow]] + & StreamLifetimeService + & BatchProcessor[LazyList[DataRow], Chunk[DataRow]] + & StreamingConsumer + & VersionedDataGraphBuilderSettings + + /** + * Creates a new instance of the BackfillDataGraphBuilder class. + * + * @param versionedDataProvider The backfill data provider. + * @param streamLifetimeService The stream lifetime service. + * @param batchProcessor The batch processor. + * @return A new instance of the BackfillDataGraphBuilder class. + */ + def apply[VersionType, BatchType](versionedDataGraphBuilderSettings: VersionedDataGraphBuilderSettings, + versionedDataProvider: VersionedDataProvider[VersionType, BatchType], + streamLifetimeService: StreamLifetimeService, + batchProcessor: BatchProcessor[BatchType, Chunk[DataRow]], + batchConsumer: StreamingConsumer): VersionedDataGraphBuilder[VersionType, BatchType] = + new VersionedDataGraphBuilder(versionedDataGraphBuilderSettings, + versionedDataProvider, + streamLifetimeService, + batchProcessor, + batchConsumer) + + /** + * Creates a new instance of the BackfillDataGraphBuilder using services provided by ZIO Environment. + * + * @return A new instance of the BackfillDataGraphBuilder class. + */ + def layer: ZIO[Environment, Nothing, VersionedDataGraphBuilder[OffsetDateTime, LazyList[DataRow]]] = + for + _ <- ZIO.log("Running in streaming mode") + sss <- ZIO.service[VersionedDataGraphBuilderSettings] + dp <- ZIO.service[VersionedDataProvider[OffsetDateTime, LazyList[DataRow]]] + ls <- ZIO.service[StreamLifetimeService] + bp <- ZIO.service[BatchProcessor[LazyList[DataRow], Chunk[DataRow]]] + bc <- ZIO.service[StreamingConsumer] + yield VersionedDataGraphBuilder(sss, dp, ls, bp, bc) + + diff --git a/src/main/scala/services/streaming/base/VersionedDataProvider.scala b/src/main/scala/services/streaming/base/VersionedDataProvider.scala new file mode 100644 index 0000000..23a575b --- /dev/null +++ b/src/main/scala/services/streaming/base/VersionedDataProvider.scala @@ -0,0 +1,31 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.base + +import zio.Task + +import java.time.Duration + +/** + * Provides a way to get the changes marked with version from a data source. + * @tparam DataVersionType The type of the data version. + * @tparam DataBatchType The type of the data batch. + */ +trait VersionedDataProvider[DataVersionType, DataBatchType] { + + /** + * Requests the changes from the data source. + * + * @param previousVersion The previous version of the data. + * @param lookBackInterval The interval to look back for changes if the version is empty. + * @return The changes from the data source. + */ + def requestChanges(previousVersion: Option[DataVersionType], lookBackInterval: Duration): Task[DataBatchType] + + def extractVersion(dataBatch: DataBatchType): Option[DataVersionType] + + /** + * The first version of the data. + */ + val firstVersion: Option[DataVersionType] = None + +} diff --git a/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala b/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala new file mode 100644 index 0000000..c3880c7 --- /dev/null +++ b/src/main/scala/services/streaming/consumers/IcebergSynapseConsumer.scala @@ -0,0 +1,117 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.consumers + +import com.sneaksanddata.arcane.framework.models.{ArcaneSchema, DataRow} +import com.sneaksanddata.arcane.framework.models.app.StreamContext +import com.sneaksanddata.arcane.framework.models.settings.SinkSettings +import com.sneaksanddata.arcane.framework.services.base.SchemaProvider +import com.sneaksanddata.arcane.framework.services.consumers.{BatchApplicationResult, StagedVersionedBatch, SynapseLinkMergeBatch} +import com.sneaksanddata.arcane.framework.services.lakehouse.CatalogWriter +import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor +import com.sneaksanddata.arcane.framework.services.streaming.consumers.{IcebergStreamingConsumer, StreamingConsumer} +import org.apache.iceberg.rest.RESTCatalog +import org.apache.iceberg.{Schema, Table} +import org.slf4j.{Logger, LoggerFactory} +import zio.stream.{ZPipeline, ZSink} +import zio.{Chunk, Task, ZIO, ZLayer} + +import java.time.format.DateTimeFormatter +import java.time.{ZoneOffset, ZonedDateTime} +import IcebergSynapseConsumer.toStagedBatch +import IcebergSynapseConsumer.getTableName +import com.sneaksanddata.arcane.framework.services.lakehouse.given_Conversion_ArcaneSchema_Schema +import com.sneaksanddata.arcane.microsoft_synapse_link.services.clients.BatchArchivationResult + +class IcebergSynapseConsumer(streamContext: StreamContext, + sinkSettings: SinkSettings, + catalogWriter: CatalogWriter[RESTCatalog, Table, Schema], + schemaProvider: SchemaProvider[ArcaneSchema], + mergeProcessor: BatchProcessor[StagedVersionedBatch, StagedVersionedBatch], + archivationProcessor: BatchProcessor[StagedVersionedBatch, BatchArchivationResult]) + extends StreamingConsumer: + + private val logger: Logger = LoggerFactory.getLogger(classOf[IcebergStreamingConsumer]) + + /** + * Returns the sink that consumes the batch. + * + * @return ZSink (stream sink for the stream graph). + */ + override def consume: ZSink[Any, Throwable, Chunk[DataRow], Any, Unit] = + writeStagingTable >>> mergeProcessor.process >>> archivationProcessor.process >>> logResults + + + private def logResults: ZSink[Any, Throwable, BatchArchivationResult, Nothing, Unit] = ZSink.foreach { e => + logger.info(s"Received the batch $e from the streaming source") + ZIO.unit + } + + private def writeStagingTable = ZPipeline[Chunk[DataRow]]() + .mapAccum(0L) { (acc, chunk) => (acc + 1, (chunk, acc.getTableName(streamContext.streamId))) } + .mapZIO({ + case (rows, tableName) => writeWithWriter(rows, tableName) + }) + + + private def writeWithWriter(rows: Chunk[DataRow], name: String): Task[StagedVersionedBatch] = + for + arcaneSchema <- ZIO.fromFuture(implicit ec => schemaProvider.getSchema) + table <- ZIO.fromFuture(implicit ec => catalogWriter.write(rows, name, arcaneSchema)) + yield table.toStagedBatch(arcaneSchema, sinkSettings.sinkLocation, Map()) + + +object IcebergSynapseConsumer: + + val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss") + + extension (batchNumber: Long) def getTableName(streamId: String): String = + s"${streamId}_${ZonedDateTime.now(ZoneOffset.UTC).format(formatter)}_$batchNumber" + + extension (table: Table) def toStagedBatch(batchSchema: ArcaneSchema, + targetName: String, + partitionValues: Map[String, List[String]]): StagedVersionedBatch = + val batchName = table.name().split('.').last + SynapseLinkMergeBatch(batchName, batchSchema, targetName, partitionValues) + + + /** + * Factory method to create IcebergConsumer + * + * @param streamContext The stream context. + * @param sinkSettings The stream sink settings. + * @param catalogWriter The catalog writer. + * @param schemaProvider The schema provider. + * @return The initialized IcebergConsumer instance + */ + def apply(streamContext: StreamContext, + sinkSettings: SinkSettings, + catalogWriter: CatalogWriter[RESTCatalog, Table, Schema], + schemaProvider: SchemaProvider[ArcaneSchema], + mergeProcessor: BatchProcessor[StagedVersionedBatch, StagedVersionedBatch], + archivationProcessor: BatchProcessor[StagedVersionedBatch, BatchArchivationResult]): IcebergSynapseConsumer = + new IcebergSynapseConsumer(streamContext, sinkSettings, catalogWriter, schemaProvider, mergeProcessor, archivationProcessor) + + /** + * The required environment for the IcebergConsumer. + */ + type Environment = SchemaProvider[ArcaneSchema] + & CatalogWriter[RESTCatalog, Table, Schema] + & BatchProcessor[StagedVersionedBatch, StagedVersionedBatch] + & StreamContext + & SinkSettings + & BatchProcessor[StagedVersionedBatch, BatchArchivationResult] + + /** + * The ZLayer that creates the IcebergConsumer. + */ + val layer: ZLayer[Environment, Nothing, StreamingConsumer] = + ZLayer { + for + streamContext <- ZIO.service[StreamContext] + sinkSettings <- ZIO.service[SinkSettings] + catalogWriter <- ZIO.service[CatalogWriter[RESTCatalog, Table, Schema]] + schemaProvider <- ZIO.service[SchemaProvider[ArcaneSchema]] + mergeProcessor <- ZIO.service[BatchProcessor[StagedVersionedBatch, StagedVersionedBatch]] + archivationProcessor <- ZIO.service[BatchProcessor[StagedVersionedBatch, BatchArchivationResult]] + yield IcebergSynapseConsumer(streamContext, sinkSettings, catalogWriter, schemaProvider, mergeProcessor, archivationProcessor) + } diff --git a/src/main/scala/services/streaming/processors/ArchivationProcessor.scala b/src/main/scala/services/streaming/processors/ArchivationProcessor.scala new file mode 100644 index 0000000..fd391db --- /dev/null +++ b/src/main/scala/services/streaming/processors/ArchivationProcessor.scala @@ -0,0 +1,29 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.processors + +import services.clients.{BatchArchivationResult, JdbcConsumer} + +import com.sneaksanddata.arcane.framework.services.consumers.StagedVersionedBatch +import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor +import zio.stream.ZPipeline +import zio.{ZIO, ZLayer} + +class ArchivationProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch]) + extends BatchProcessor[StagedVersionedBatch, BatchArchivationResult]: + + override def process: ZPipeline[Any, Throwable, StagedVersionedBatch, BatchArchivationResult] = + ZPipeline.mapZIO(batch => ZIO.fromFuture(implicit ec => jdbcConsumer.archiveBatch(batch))) + +object ArchivationProcessor: + + type Environment = JdbcConsumer[StagedVersionedBatch] + + def apply(jdbcConsumer: JdbcConsumer[StagedVersionedBatch]): ArchivationProcessor = + new ArchivationProcessor(jdbcConsumer) + + val layer: ZLayer[Environment, Nothing, ArchivationProcessor] = + ZLayer { + for + jdbcConsumer <- ZIO.service[JdbcConsumer[StagedVersionedBatch]] + yield ArchivationProcessor(jdbcConsumer) + } diff --git a/src/main/scala/services/streaming/processors/CdmGroupingProcessor.scala b/src/main/scala/services/streaming/processors/CdmGroupingProcessor.scala new file mode 100644 index 0000000..7cbb6b7 --- /dev/null +++ b/src/main/scala/services/streaming/processors/CdmGroupingProcessor.scala @@ -0,0 +1,54 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.processors + +import com.sneaksanddata.arcane.framework.models.ArcaneType.* +import com.sneaksanddata.arcane.framework.models.settings.GroupingSettings +import com.sneaksanddata.arcane.framework.models.{ArcaneType, DataCell, DataRow} +import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor + +import zio.stream.ZPipeline +import zio.{Chunk, ZIO, ZLayer} + +import scala.concurrent.duration.Duration + +/** + * The batch processor implementation that converts a lazy DataBatch to a Chunk of DataRow. + * @param groupingSettings The grouping settings. + */ +class CdmGroupingProcessor(groupingSettings: GroupingSettings, typeAlignmentProcessor: TypeAlignmentService) + extends BatchProcessor[LazyList[DataRow], Chunk[DataRow]]: + + /** + * Processes the incoming data. + * + * @return ZPipeline (stream source for the stream graph). + */ + def process: ZPipeline[Any, Throwable, LazyList[DataRow], Chunk[DataRow]] = ZPipeline + .map[LazyList[DataRow], Chunk[DataRow]](list => Chunk.fromIterable(list)) + .flattenChunks + .map(typeAlignmentProcessor.alignTypes) + .groupedWithin(groupingSettings.rowsPerGroup, groupingSettings.groupingInterval) + +/** + * The companion object for the LazyOutputDataProcessor class. + */ +object CdmGroupingProcessor: + + type Environment = GroupingSettings + & TypeAlignmentService + + /** + * The ZLayer that creates the LazyOutputDataProcessor. + */ + val layer: ZLayer[Environment, Nothing, CdmGroupingProcessor] = + ZLayer { + for + settings <- ZIO.service[GroupingSettings] + tas <- ZIO.service[TypeAlignmentService] + yield CdmGroupingProcessor(settings, tas) + } + + def apply(groupingSettings: GroupingSettings, typeAlignmentService: TypeAlignmentService): CdmGroupingProcessor = + require(groupingSettings.rowsPerGroup > 0, "Rows per group must be greater than 0") + require(!groupingSettings.groupingInterval.equals(Duration.Zero), "groupingInterval must be greater than 0") + new CdmGroupingProcessor(groupingSettings, typeAlignmentService) diff --git a/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala b/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala new file mode 100644 index 0000000..0d559fb --- /dev/null +++ b/src/main/scala/services/streaming/processors/MergeBatchProcessor.scala @@ -0,0 +1,50 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.processors + +import com.sneaksanddata.arcane.framework.services.consumers.StagedVersionedBatch +import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor +import com.sneaksanddata.arcane.microsoft_synapse_link.services.clients.JdbcConsumer +import zio.stream.ZPipeline +import zio.{ZIO, ZLayer} + +/** + * Processor that merges data into a target table. + * + * @param jdbcConsumer The JDBC consumer. + */ +class MergeBatchProcessor(jdbcConsumer: JdbcConsumer[StagedVersionedBatch]) + extends BatchProcessor[StagedVersionedBatch, StagedVersionedBatch]: + + /** + * Processes the incoming data. + * + * @return ZPipeline (stream source for the stream graph). + */ + override def process: ZPipeline[Any, Throwable, StagedVersionedBatch, StagedVersionedBatch] = + ZPipeline.mapZIO(batch => ZIO.fromFuture(implicit ec => jdbcConsumer.applyBatch(batch)).map(_ => batch)) + + +object MergeBatchProcessor: + + /** + * Factory method to create MergeProcessor + * @param jdbcConsumer The JDBC consumer. + * @return The initialized MergeProcessor instance + */ + def apply(jdbcConsumer: JdbcConsumer[StagedVersionedBatch]): MergeBatchProcessor = + new MergeBatchProcessor(jdbcConsumer) + + /** + * The required environment for the MergeProcessor. + */ + type Environment = JdbcConsumer[StagedVersionedBatch] + + /** + * The ZLayer that creates the MergeProcessor. + */ + val layer: ZLayer[Environment, Nothing, MergeBatchProcessor] = + ZLayer { + for + jdbcConsumer <- ZIO.service[JdbcConsumer[StagedVersionedBatch]] + yield MergeBatchProcessor(jdbcConsumer) + } diff --git a/src/main/scala/services/streaming/processors/TypeAlignmentService.scala b/src/main/scala/services/streaming/processors/TypeAlignmentService.scala new file mode 100644 index 0000000..bf24d5e --- /dev/null +++ b/src/main/scala/services/streaming/processors/TypeAlignmentService.scala @@ -0,0 +1,52 @@ +package com.sneaksanddata.arcane.microsoft_synapse_link +package services.streaming.processors + +import com.sneaksanddata.arcane.framework.models.ArcaneType.* +import com.sneaksanddata.arcane.framework.models.settings.GroupingSettings +import com.sneaksanddata.arcane.framework.models.{ArcaneType, DataCell, DataRow} +import com.sneaksanddata.arcane.framework.services.streaming.base.BatchProcessor +import zio.stream.ZPipeline +import zio.{Chunk, ZIO, ZLayer} +import zio.Chunk +import zio.stream.ZPipeline + +/** + * The service that processes a DataRow and aligns the types of the cells. + * This operation is necessary to ensure that the data is correctly processed by the stream graph. + * See the Microsoft Synapse documentation for more information: + * https://learn.microsoft.com/en-us/power-apps/maker/data-platform/export-data-lake-faq + */ +trait TypeAlignmentService: + def alignTypes(data: DataRow): DataRow + +private class TypeAlignmentServiceImpl extends TypeAlignmentService: + + override def alignTypes(row: DataRow): DataRow = row map { cell => + DataCell(cell.name, cell.Type, convertType(cell.Type, cell.value)) + } + + private def convertType(arcaneType: ArcaneType, value: Any): Any = + value match + case None => null + case Some(v) => convertSome(arcaneType, v) + + private def convertSome(arcaneType: ArcaneType, value: Any): Any = arcaneType match + case LongType => value.toString.toLong + case ByteArrayType => value.toString.getBytes + case BooleanType => value.toString.toBoolean + case StringType => value.toString + case DateType => java.sql.Date.valueOf(value.toString) + case TimestampType => null //java.sql.Timestamp.valueOf(value.toString) // TODO + case DateTimeOffsetType => java.time.OffsetDateTime.parse(value.toString) + case BigDecimalType => BigDecimal(value.toString) + case DoubleType => value.toString.toDouble + case IntType => value.toString.toInt + case FloatType => value.toString.toFloat + case ShortType => value.toString.toShort + case TimeType => java.sql.Time.valueOf(value.toString) + + +object TypeAlignmentService: + val layer: ZLayer[Any, Nothing, TypeAlignmentService] = ZLayer.succeed(new TypeAlignmentServiceImpl) + + def apply(): TypeAlignmentService = new TypeAlignmentServiceImpl diff --git a/unit-tests.env b/unit-tests.env new file mode 100644 index 0000000..e69de29