diff --git a/helm-charts/flink-historyserver/.helmignore b/helm-charts/flink-historyserver/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/helm-charts/flink-historyserver/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/helm-charts/flink-historyserver/Chart.yaml b/helm-charts/flink-historyserver/Chart.yaml new file mode 100644 index 0000000..aad6ff0 --- /dev/null +++ b/helm-charts/flink-historyserver/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: flink-historyserver +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.2 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "0.1.2" diff --git a/helm-charts/flink-historyserver/templates/_helpers.tpl b/helm-charts/flink-historyserver/templates/_helpers.tpl new file mode 100644 index 0000000..91cae44 --- /dev/null +++ b/helm-charts/flink-historyserver/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "flink-historyserver.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "flink-historyserver.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "flink-historyserver.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "flink-historyserver.labels" -}} +helm.sh/chart: {{ include "flink-historyserver.chart" . }} +{{ include "flink-historyserver.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "flink-historyserver.selectorLabels" -}} +app.kubernetes.io/name: {{ include "flink-historyserver.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "flink-historyserver.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "flink-historyserver.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/helm-charts/flink-historyserver/templates/configmap.yaml b/helm-charts/flink-historyserver/templates/configmap.yaml new file mode 100644 index 0000000..5c3fd78 --- /dev/null +++ b/helm-charts/flink-historyserver/templates/configmap.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: historyserver-config +data: + flink-conf.yaml: | + {{- .Values.flinkConfigMap | nindent 4 }} + log4j-console.properties: | + {{- .Values.log4jConfigMap | nindent 4 }} diff --git a/helm-charts/flink-historyserver/templates/efs.yaml b/helm-charts/flink-historyserver/templates/efs.yaml new file mode 100644 index 0000000..8cb816f --- /dev/null +++ b/helm-charts/flink-historyserver/templates/efs.yaml @@ -0,0 +1,37 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: {{ .Release.Name }}-efs-flink-history +parameters: + provisioningMode: efs-ap + fileSystemId: "{{- .Values.efsFileSystemId }}" +provisioner: "efs.csi.aws.com" +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: flink-historyserver-efs-pv +spec: + capacity: + storage: "1Mi" + volumeMode: "Filesystem" + accessModes: + - "ReadWriteMany" + # 'persistentVolumeReclaimPolicy' means EFS volumes must be manually cleaned up when testing is done + persistentVolumeReclaimPolicy: Retain + storageClassName: {{ .Release.Name }}-efs-flink-history + csi: + driver: "efs.csi.aws.com" + volumeHandle: "{{- .Values.efsFileSystemId }}" +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-historyserver-efs-pvc +spec: + accessModes: + - ReadWriteMany + storageClassName: {{ .Release.Name }}-efs-flink-history + resources: + requests: + storage: 1Mi diff --git a/helm-charts/flink-historyserver/templates/historyserver.yaml b/helm-charts/flink-historyserver/templates/historyserver.yaml new file mode 100644 index 0000000..1f0df7a --- /dev/null +++ b/helm-charts/flink-historyserver/templates/historyserver.yaml @@ -0,0 +1,121 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + generation: 1 + labels: + app: historyserver + name: historyserver + namespace: default +spec: + progressDeadlineSeconds: 150 + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: historyserver + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + creationTimestamp: null + labels: + app: historyserver + spec: + containers: + - args: + - history-server + command: + - /docker-entrypoint.sh + env: + - name: _POD_IP_ADDRESS + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + image: flink:{{- .Values.flinkVersion }} + imagePullPolicy: IfNotPresent + name: flink-main-container + ports: + - containerPort: 8082 + name: history + protocol: TCP + resources: + limits: + cpu: 300m + memory: 1536Mi + requests: + cpu: 300m + memory: 1536Mi + startupProbe: + failureThreshold: 2147483647 + httpGet: + path: /config + port: history + scheme: HTTP + periodSeconds: 1 + successThreshold: 1 + timeoutSeconds: 1 + volumeMounts: + - mountPath: /opt/history/jobs + name: efs-flink-history + - mountPath: /opt/flink/conf + name: flink-config-volume + dnsPolicy: ClusterFirst + initContainers: + - command: + - sh + - -c + - chown 9999:9999 /opt/history/jobs && ls -lhd /opt/history/jobs + image: busybox:1.36.1 + imagePullPolicy: IfNotPresent + name: efs-mount-ownership-fix + resources: {} + volumeMounts: + - mountPath: /opt/history/jobs + name: efs-flink-history + restartPolicy: Always + schedulerName: default-scheduler + securityContext: + fsGroup: 9999 + serviceAccount: flink + serviceAccountName: flink + terminationGracePeriodSeconds: 30 + volumes: + - name: efs-flink-history + persistentVolumeClaim: + claimName: flink-historyserver-efs-pvc + - configMap: + defaultMode: 420 + items: + - key: log4j-console.properties + path: log4j-console.properties + - key: flink-conf.yaml + path: flink-conf.yaml + name: historyserver-config + name: flink-config-volume +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: historyserver + name: historyserver-rest + namespace: default +spec: + ipFamilies: + - IPv4 + ipFamilyPolicy: SingleStack + ports: + - name: history + port: 8082 + protocol: TCP + targetPort: 8082 + selector: + app: historyserver + sessionAffinity: None + type: ClusterIP +status: + loadBalancer: {} diff --git a/helm-charts/flink-historyserver/templates/tests/README.md b/helm-charts/flink-historyserver/templates/tests/README.md new file mode 100644 index 0000000..e69de29 diff --git a/helm-charts/flink-historyserver/values.schema.json b/helm-charts/flink-historyserver/values.schema.json new file mode 100644 index 0000000..a3f82ae --- /dev/null +++ b/helm-charts/flink-historyserver/values.schema.json @@ -0,0 +1,19 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["efsFileSystemId", "log4jConfigMap", "flinkConfigMap"], + "properties": { + "efsFileSystemId": { + "type": "string", + "minLength": 1 + }, + "log4jConfigMap": { + "type": "string", + "minLength": 1 + }, + "flinkConfigMap": { + "type": "string", + "minLength": 1 + } + } +} diff --git a/helm-charts/flink-historyserver/values.yaml b/helm-charts/flink-historyserver/values.yaml new file mode 100644 index 0000000..a7337cd --- /dev/null +++ b/helm-charts/flink-historyserver/values.yaml @@ -0,0 +1,4 @@ +efsFileSystemId: "" +flinkConfigMap: "" +log4jConfigMap: "" +flinkVersion: "1.16" \ No newline at end of file diff --git a/terraform/aws/.gitignore b/terraform/aws/.gitignore new file mode 100644 index 0000000..1e82fc7 --- /dev/null +++ b/terraform/aws/.gitignore @@ -0,0 +1 @@ +*.yaml diff --git a/terraform/aws/addons.tf b/terraform/aws/addons.tf index 3dc81e8..7809b11 100644 --- a/terraform/aws/addons.tf +++ b/terraform/aws/addons.tf @@ -36,4 +36,28 @@ resource "aws_eks_addon" "ebs_provisioner" { depends_on = [ aws_iam_role_policy_attachment.ebs_provisioner ] +} + +# EFS CSI Driver for HistoryServer +resource "aws_iam_role" "efs_provisioner" { + name = "${var.cluster_name}-eks-efs-provisioner" + assume_role_policy = data.aws_iam_policy_document.assume_role_with_oidc.json +} + +resource "aws_iam_role_policy_attachment" "efs_provisioner" { + policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEFSCSIDriverPolicy" + role = aws_iam_role.efs_provisioner.name +} + +resource "aws_eks_addon" "efs_provisioner" { + cluster_name = aws_eks_cluster.cluster.name + addon_name = "aws-efs-csi-driver" + # Fetch the most recent version for your current version of k8s + # AWS_PROFILE= eksctl utils describe-addon-versions --kubernetes-version 1.27 -v0 | jq '.Addons[] | select(.AddonName == "aws-efs-csi-driver") | .AddonVersions[0]' + addon_version = "v1.7.0-eksbuild.1" + resolve_conflicts_on_create = "OVERWRITE" + service_account_role_arn = aws_iam_role.efs_provisioner.arn + depends_on = [ + aws_iam_role_policy_attachment.efs_provisioner + ] } \ No newline at end of file diff --git a/terraform/aws/cluster.tf b/terraform/aws/cluster.tf index 2b9f7ff..a80581e 100644 --- a/terraform/aws/cluster.tf +++ b/terraform/aws/cluster.tf @@ -53,7 +53,7 @@ resource "aws_iam_openid_connect_provider" "cluster_oidc" { module "cluster_autoscaler_irsa" { source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks" - role_name = "cluster_autoscaler" + role_name = "${var.cluster_name}_cluster_autoscaler" attach_cluster_autoscaler_policy = true cluster_autoscaler_cluster_ids = [ diff --git a/terraform/aws/efs.tf b/terraform/aws/efs.tf new file mode 100644 index 0000000..74a2a23 --- /dev/null +++ b/terraform/aws/efs.tf @@ -0,0 +1,10 @@ +resource "aws_efs_file_system" "job_history" { + creation_token = "pforge-flink-job-history" +} + +resource "aws_efs_mount_target" "job_history" { + for_each = toset(data.aws_subnets.default.ids) + file_system_id = aws_efs_file_system.job_history.id + subnet_id = each.value + security_groups = [aws_eks_cluster.cluster.vpc_config[0].cluster_security_group_id] +} diff --git a/terraform/aws/flink_operator_config.tpl b/terraform/aws/flink_operator_config.tpl new file mode 100644 index 0000000..d47464f --- /dev/null +++ b/terraform/aws/flink_operator_config.tpl @@ -0,0 +1,7 @@ +kubernetes.operator.metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory +kubernetes.operator.metrics.reporter.prom.factory.port: 9999 +kubernetes.jobmanager.annotations: + prometheus.io/scrape: true + prometheus.io/port: 9999 +jobmanager.archive.fs.dir: /opt/history/jobs +historyserver.archive.fs.dir: /opt/history/jobs \ No newline at end of file diff --git a/terraform/aws/helm_historyserver.tf b/terraform/aws/helm_historyserver.tf new file mode 100644 index 0000000..1df20ea --- /dev/null +++ b/terraform/aws/helm_historyserver.tf @@ -0,0 +1,82 @@ +provider "kubernetes" { + host = aws_eks_cluster.cluster.endpoint + cluster_ca_certificate = base64decode(aws_eks_cluster.cluster.certificate_authority[0].data) + token = data.aws_eks_cluster_auth.cluster.token +} + +data "aws_eks_cluster_auth" "cluster" { + name = "${aws_eks_cluster.cluster.name}" + depends_on = [ + aws_eks_cluster.cluster, + helm_release.flink_operator + ] +} + +data "kubernetes_config_map" "operator_default" { + # our `kind: Deployment` for historyserver + # needs to use some of the ConfigMap keys/values + # from the default operator config so + # we handle that reuse here + metadata { + name = "flink-operator-config" + namespace = "default" + } + depends_on = [ + aws_eks_cluster.cluster, + helm_release.flink_operator + ] +} + +locals { + # removing lines that start with '#' b/c TF >> helm doesn't like them + filtered_log4j_config = join("\n", [ + for line in split("\n", data.kubernetes_config_map.operator_default.data["log4j-console.properties"]) : + line if !startswith(line, "#") + ]) + + # removing lines that start with '#' b/c TF >> helm doesn't like them + filtered_flink_config = join("\n", [ + for line in split("\n", data.kubernetes_config_map.operator_default.data["flink-conf.yaml"]) : + line if !startswith(line, "#") + ]) +} + +resource "local_file" "log4j_config_output" { + filename = "${path.module}/log4jconfig.yaml" + content = local.filtered_log4j_config +} + +resource "local_file" "flink_config_output" { + filename = "${path.module}/flinkconfig.yaml" + content = local.filtered_flink_config +} + + +resource "helm_release" "flink_historyserver" { + name = "flink-historyserver" + chart = "../../helm-charts/flink-historyserver" + namespace = "default" + create_namespace = false + + set { + name = "efsFileSystemId" + value = "${aws_efs_file_system.job_history.id}" + } + set { + name = "flinkVersion" + value = "${var.flink_version}" + } + set { + name = "log4jConfigMap" + value = local_file.log4j_config_output.content + } + set { + name = "flinkConfigMap" + value = local_file.flink_config_output.content + } + wait = true + depends_on = [ + aws_eks_cluster.cluster, + helm_release.flink_operator + ] +} diff --git a/terraform/aws/main.tf b/terraform/aws/main.tf index 76323a0..8a61e82 100644 --- a/terraform/aws/main.tf +++ b/terraform/aws/main.tf @@ -1,7 +1,7 @@ terraform { backend "s3" { # FIXME: Investigate if we need the dynamodb locking here? - bucket = "pangeo-forge-federation-tfstate" + bucket = "pangeo-forge-federation-gcorradini-tfstate-v3" key = "terraform" region = "us-west-2" } @@ -28,4 +28,8 @@ data "aws_subnets" "default" { values = [data.aws_vpc.default.id] } } +data "aws_security_group" "default" { + vpc_id = data.aws_vpc.default.id + name = "default" +} diff --git a/terraform/aws/operator.tf b/terraform/aws/operator.tf index 90e3a33..6a090f4 100644 --- a/terraform/aws/operator.tf +++ b/terraform/aws/operator.tf @@ -17,6 +17,10 @@ resource "helm_release" "cert_manager" { ] } +resource "local_file" "flink_operator_config" { + content = templatefile("flink_operator_config.tpl",{}) + filename = "flink_operator_config.yaml" +} resource "helm_release" "flink_operator" { name = "flink-operator" @@ -42,14 +46,7 @@ resource "helm_release" "flink_operator" { # Enable prometheus metrics for all set { name = "defaultConfiguration.flink-conf\\.yaml" - value = yamlencode({ - "kubernetes.operator.metrics.reporter.prom.class" : "org.apache.flink.metrics.prometheus.PrometheusReporter", - "kubernetes.operator.metrics.reporter.prom.port" : "9999", - "kubernetes.jobmanager.annotations" : { - "prometheus.io/scrape" : "true" - "prometheus.io/port" : "9999" - } - }) + value = local_file.flink_operator_config.content } set { diff --git a/terraform/aws/variables.tf b/terraform/aws/variables.tf index 6501e88..61ea1f1 100644 --- a/terraform/aws/variables.tf +++ b/terraform/aws/variables.tf @@ -28,12 +28,19 @@ variable "max_instances" { } variable "flink_operator_version" { - default = "1.5.0" + default = "1.6.1" description = <<-EOT Version of Flink Operator to install. EOT } +variable "flink_version" { + default = "1.16" + description = <<-EOT + Version of Flink to install. + EOT +} + variable "cluster_autoscaler_version" { default = "9.21.0" description = <<-EOT