Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Remove storage class, not needed anymore in the operator. (#854)
Browse files Browse the repository at this point in the history
* Fix for readwrite, should need security context.

* Some minor cleanup.

* Copied the right version of  lyft golang types.

* Fixed SecurityContext in FlinkRunner.

* Applying correct pod security context for fsGroup, when FlinkRunner mounts writable volumes.

* Removed usages of persistentStorageClass for cloudflow helm chart.

* Some comments on PVCs for Flink and Spark.

* Added “cloudflow.kafka.bootstrap-servers” back in for older apps running 2.0.10.

* Minor change.

* Added draft release notes for 2.0.13.
  • Loading branch information
RayRoestenburg authored Nov 9, 2020
1 parent a78e332 commit e2e0b9a
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ object ConfigurationScopeLayering {
appConfig: Config,
clusterSecretConfigs: Map[String, Config]): Config = {
val defaultClusterConfig = clusterSecretConfigs.get(TopicActions.DefaultConfigurationName)
// Adding bootstrap-servers key for backwards compatibility
val kafkaBootstrapServersCompat2010 = defaultClusterConfig.map(_.getString("bootstrap.servers"))

val portMappingConfigs = deployment.portMappings.flatMap {
case (port, topic) =>
Try {
Expand Down Expand Up @@ -160,9 +163,15 @@ object ConfigurationScopeLayering {
"""))
}.toOption
}
portMappingConfigs.foldLeft(streamletConfig) { (acc, el) =>
acc.withFallback(el)
}
val conf = portMappingConfigs
.foldLeft(streamletConfig) { (acc, el) =>
acc.withFallback(el)
}
kafkaBootstrapServersCompat2010
.map { bs =>
conf.withFallback(ConfigFactory.parseString(s"""cloudflow.kafka.bootstrap-servers="${bs}""""))
}
.getOrElse(conf)
}

private def extractPodsConfig(streamletConfig: Config) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De
def editor = (obj: Deployment, newMetadata: ObjectMeta) {
obj.copy(metadata = newMetadata)
}
def configEditor = (obj: ConfigMap, newMetadata: ObjectMeta) obj.copy(metadata = newMetadata)
val runtime = Runtime
def resourceDefinition = implicitly[ResourceDefinition[Deployment]]
val requiresPersistentVolume = false
def configEditor = (obj: ConfigMap, newMetadata: ObjectMeta) obj.copy(metadata = newMetadata)
val runtime = Runtime
def resourceDefinition = implicitly[ResourceDefinition[Deployment]]

def appActions(app: CloudflowApplication.CR,
namespace: String,
Expand Down Expand Up @@ -165,8 +164,9 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De

val volume = Volume(configMapName, ConfigMapVolumeSource(configMapName))

// Streamlet volume mounting
val streamletToDeploy = app.spec.streamlets.find(streamlet streamlet.name == deployment.streamletName)

// Streamlet volume mounting (Defined by Streamlet.volumeMounts API)
val pvcRefVolumes =
streamletToDeploy.map(_.descriptor.volumeMounts.map(mount Volume(mount.name, PersistentVolumeClaimRef(mount.pvcName))).toList)
val pvcVolumeMounts = streamletToDeploy
Expand Down Expand Up @@ -225,20 +225,20 @@ final class AkkaRunner(akkaRunnerDefaults: AkkaRunnerDefaults) extends Runner[De
)

// This is the group id of the user in the streamlet container,
// its need to make volumes managed by certain volume plugins writable.
// it needs to make volumes managed by certain volume plugins writable.
// If the image used with the container changes, this value most likely
// have to be updated
// will have to be updated
val dockerContainerGroupId = Runner.DockerContainerGroupId
// We only need to set this when we want to write to the a volume in a pod
val fsGroup = pvcVolumeMounts
.find(volume volume.readOnly == true)
// We only need to set this when we want to write to a volume in a pod
val securityContext = pvcVolumeMounts
.find(volume volume.readOnly == false)
.flatMap(_ Some(PodSecurityContext(fsGroup = Some(dockerContainerGroupId))))

val podSpec =
Pod
.Spec(serviceAccountName = Name.ofServiceAccount(),
volumes = pvcRefVolumes.getOrElse(List.empty[Volume]),
securityContext = fsGroup)
securityContext = securityContext)
.addContainer(container)
.addVolume(volume)
.addVolume(secretVolume)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
val image = deployment.image
val streamletToDeploy = app.spec.streamlets.find(streamlet streamlet.name == deployment.streamletName)

val dockerContainerGroupId = Runner.DockerContainerGroupId
val securityContext =
if (streamletToDeploy.exists(_.descriptor.volumeMounts.exists(_.accessMode == "ReadWriteMany")))
Some(PodSecurityContext(fsGroup = Some(dockerContainerGroupId)))
else
None

val volumes = makeVolumesSpec(deployment, streamletToDeploy) ++ getVolumes(podsConfig, PodsConfig.CloudflowPodName)
val volumeMounts = makeVolumeMountsSpec(streamletToDeploy) ++ getVolumeMounts(podsConfig, PodsConfig.CloudflowPodName)

Expand Down Expand Up @@ -214,6 +221,7 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
val _spec = Spec(
image = image,
jarName = RunnerJarName,
securityContext = securityContext,
parallelism = parallelismForResource,
entryClass = RuntimeMainClass,
volumes = volumes,
Expand Down Expand Up @@ -345,7 +353,7 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
// secret
val secretVolume = Volume("secret-vol", Volume.Secret(deployment.secretName))

// Streamlet volume mounting
// Streamlet volume mounting (Defined by Streamlet.volumeMounts API)
val streamletPvcVolume = streamletToDeploy.toVector.flatMap(_.descriptor.volumeMounts.map { mount
Volume(mount.name, Volume.PersistentVolumeClaimRef(mount.pvcName))
})
Expand All @@ -357,10 +365,6 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner
* For every volume we need a volume mount spec
* // "volumeMounts": [
* // {
* // "name": "persistent-storage",
* // "mountPath": "/mnt/flink/storage"
* // },
* // {
* // "name": "config-vol",
* // "mountPath": "/etc/cloudflow-runner"
* // },
Expand All @@ -385,7 +389,6 @@ final class FlinkRunner(flinkRunnerDefaults: FlinkRunnerDefaults) extends Runner

object FlinkResource {

final case class SecurityContext(fsGroup: Option[Int])
final case class HostPath(path: String, `type`: String)
final case class NamePath(name: String, path: String)
final case class NamePathSecretType(name: String, path: String, secretType: String = "Generic")
Expand Down Expand Up @@ -441,6 +444,7 @@ object FlinkResource {
ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"`
ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
SecurityContext *apiv1.PodSecurityContext `json:"securityContext,omitempty"`
FlinkConfig FlinkConfig `json:"flinkConfig"`
FlinkVersion string `json:"flinkVersion"`
TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"`
Expand All @@ -452,6 +456,7 @@ object FlinkResource {
// Deprecated: use SavepointPath instead
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
SavepointDisabled bool `json:"savepointDisabled"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
Expand All @@ -465,13 +470,16 @@ object FlinkResource {
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
} */
TearDownVersionHash string `json:"tearDownVersionHash,omitempty"`
}
*/

final case class Spec(
image: String = "", // required parameter
imagePullPolicy: String = "Always",
flinkVersion: String = "1.10",
serviceAccountName: String = Name.ofServiceAccount,
securityContext: Option[PodSecurityContext] = None,
jarName: String,
parallelism: Int,
entryClass: String = "",
Expand Down Expand Up @@ -501,8 +509,7 @@ object FlinkResource {
submissionTime: Option[String] // may need to parse it as a date later on
)

implicit val hostPathFmt: Format[HostPath] = Json.format[HostPath]
implicit val securityContextFmt: Format[SecurityContext] = Json.format[SecurityContext]
implicit val hostPathFmt: Format[HostPath] = Json.format[HostPath]

implicit val namePathFmt: Format[NamePath] = Json.format[NamePath]
implicit val namePathSecretTypeFmt: Format[NamePathSecretType] = Json.format[NamePathSecretType]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,6 @@ object PodsConfig {
}.toList
}

/**
* TODO How to make this optional?
*/
implicit val sourceConfReader: ValueReader[Volume.Source] = ValueReader.relative { config =>
val res: Option[Volume.Source] = config.root().keySet().toArray().headOption.map {
case "secret" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner
def resourceDefinition = implicitly[ResourceDefinition[CR]]
def prometheusConfig = PrometheusConfig(prometheusRules)

val requiresPersistentVolume = true

val DriverPod = "driver"
val ExecutorPod = "executor"

Expand Down Expand Up @@ -204,9 +202,7 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner

val streamletToDeploy = app.spec.streamlets.find(streamlet streamlet.name == deployment.streamletName)

// Streamlet volume mounting
// Volume mounting

// Streamlet volume mounting (Defined by Streamlet.volumeMounts API)
val streamletPvcVolume = streamletToDeploy.toSeq.flatMap(_.descriptor.volumeMounts.map { mount
Volume(mount.name, Volume.PersistentVolumeClaimRef(mount.pvcName))
})
Expand Down Expand Up @@ -241,8 +237,6 @@ final class SparkRunner(sparkRunnerDefaults: SparkRunnerDefaults) extends Runner
Map(CloudflowLabels.StreamletNameLabel -> deployment.streamletName, CloudflowLabels.AppIdLabel -> appId)
.mapValues(Name.ofLabelValue)

//import ctx.sparkRunnerDefaults._

val driver = addDriverResourceRequirements(
Driver(
javaOptions = getJavaOptions(podsConfig, DriverPod).orElse(driverDefaults.javaOptions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class ConfigurationScopeLayeringSpec
maybeValidConfig.getConfig("connection-config").getString("connection.foo.bar") mustBe "default-baz"
maybeValidConfig.getConfig("producer-config").getString("producer.foo.bar") mustBe "default-baz"
maybeValidConfig.getConfig("consumer-config").getString("consumer.foo.bar") mustBe "default-baz"

configs.streamlet.getString("cloudflow.kafka.bootstrap-servers") mustBe "default-named-cluster:9092"
// 'sometimes' port uses global topic configuration from app config, and falls back 'default' named config for bootstrap.servers and other config
val sometimesPort = "sometimes"
val sometimesConfig = configs.streamlet.getConfig(s"cloudflow.runner.streamlet.context.port_mappings.$sometimesPort.config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ See xref:how-to-install-and-use-strimzi.adoc[Installing Kafka with Strimzi] as a

=== Storage requirements (for use with Spark or Flink)

**If you plan to write Cloudflow applications using Spark or Flink**, the Kubernetes cluster will need to have a storage class of the `ReadWriteMany` type installed. The name of the `ReadWriteMany` storage class name is required when installing Cloudflow.
**If you plan to write Cloudflow applications using Spark or Flink**, the Kubernetes cluster will need to have a storage class of the `ReadWriteMany` type installed.

For testing purposes, we suggest using the NFS Server Provisioner, which can be found here: https://github.com/helm/charts/tree/master/stable/nfs-server-provisioner[NFS Server Provisioner Helm chart]

Expand Down Expand Up @@ -73,8 +73,6 @@ nfs cloudflow-nfs 29s
standard (default) kubernetes.io/gce-pd 2m57s
----

NOTE: When installing Cloudflow, we will use the name `nfs` to indicate that Cloudflow should use the NFS storage class.

NOTE:: The documented NFS storage class is very portable and has been verified to work on GKE, EKS, AKS and Openshift.

== More Storage Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,8 @@ First, we add the Cloudflow Helm repository and update the local index:
helm repo add cloudflow-helm-charts https://lightbend.github.io/cloudflow-helm-charts/
helm repo update

=== Installing (with persistent storage support for Spark or Flink)
=== Installing

Spark and Flink require persistent storage to keep state.
Cloudflow creates persistent volume claims for this purpose, using a storage class. The `persistentStorageClass` parameter sets the storage class to `nfs`. This storage class, as mentioned in xref:installation-prerequisites.adoc[], has to be of the type `ReadWriteMany`. In our example, we are using the `nfs` storage class.

cloudflow_operator.persistentStorageClass=nfs

The `kafkaClusters.default.bootstrapServers` parameter sets the address and port of the Kafka cluster that Cloudflow will use. In this example, we have used the address of a Strimzi created Kafka cluster located in the `cloudflow` namespace.

kafkaClusters.default.bootstrapServers=cloudflow-strimzi-kafka-bootstrap.cloudflow:9092

The following command installs Cloudflow using the Helm chart:

[subs="attributes+"]
helm install cloudflow cloudflow-helm-charts/cloudflow --namespace cloudflow \
--version "{cloudflow-version}" \
--set cloudflow_operator.persistentStorageClass=nfs \
--set kafkaClusters.default.bootstrapServers=cloudflow-strimzi-kafka-bootstrap.cloudflow:9092


=== Installing (without persistent storage support)

If you do not intend to use Spark or Flink, you can omit the `persistentStorageClass` parameter.
The `kafkaClusters.default.bootstrapServers` parameter sets the address and port of the default Kafka cluster that Cloudflow will use. In this example, we have used the address of a Strimzi created Kafka cluster located in the `cloudflow` namespace.

kafkaClusters.default.bootstrapServers=cloudflow-strimzi-kafka-bootstrap.cloudflow:9092
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ To upgrade Cloudflow in your cluster to the latest version run
----
helm upgrade cloudflow cloudflow-helm-charts/cloudflow \
--namespace cloudflow \
--set cloudflow_operator.persistentStorageClass=nfs \
--set kafkaClusters.default.bootstrapServers=cloudflow-strimzi-kafka-bootstrap.cloudflow:9092
----

Expand All @@ -30,7 +29,6 @@ To upgrade Cloudflow to a specific version, add the _--version_ flag. For exampl
----
helm upgrade cloudflow cloudflow-helm-charts/cloudflow \
--namespace cloudflow \
--set cloudflow_operator.persistentStorageClass=nfs \
--version="{cloudflow-version}" \
--set kafkaClusters.default.bootstrapServers=cloudflow-strimzi-kafka-bootstrap.cloudflow:9092
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Deploys a Cloudflow application to the cluster.
== Prerequisites

To deploy a Flink or Spark application it is necessary that a PVC named `cloudflow-flink' or `cloudflow-spark` already exists in a namespace that is equal
to the application name. This name is the exactly the same as the json file you'll use to deploy, excluding the extension. e.g. For a file `swiss-knife.json` the namespace would be `swiss-knife`. Cloudflow will automatically mount this PVCs in the streamlets themselves to allow any job, Flink or Spark, to store its state.
to the application name. This name is exactly the same as the json file you'll use to deploy, excluding the extension. e.g. For a file `swiss-knife.json` the namespace would be `swiss-knife`. Cloudflow will automatically mount this PVCs in the streamlets themselves to allow any job, Flink or Spark, to store its state.

== Synopsis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Each outlet also allows the user to define a partitioning function that will be

== `StreamletShape`

The `StreamletShape` captures the connectivity and compatibility details of a Spark-based streamlet.
The `StreamletShape` captures the connectivity and compatibility details of a Flink-based streamlet.
It captures which—and how many—inlets and outlets the streamlet has.

The `sbt-cloudflow` plugin extracts—amongst other things—the _shape_ from the streamlet that it finds on the classpath.
Expand Down Expand Up @@ -111,8 +111,7 @@ In Cloudflow, streamlets communicate with each other through Kafka - hence the s

A Pipeline application can be stateful and use the full capabilities of _keyed state_ and _operator state_ in Flink. Stateful operators are fully supported within a Flink streamlet. All states are stored in state stores deployed on Kubernetes using _Persistent Volume Claims_ (PVCs) backed by a storage class that allows for _access mode_ `ReadWriteMany`.

PVCs are automatically provisioned for each Cloudflow application.
The volumes are claimed for as long as the Pipeline application is deployed, allowing for seamless re-deployment, upgrades, and recovery in case of failure.
To deploy a Flink application it is necessary that a PVC already exists in the namespace for the application. Cloudflow will automatically mount this PVC in the streamlets themselves to allow state storage, as long as it has a `/mnt/flink/storage` mount path. Cloudflow will look for a PVC named `cloudflow-flink` by default that mounts this path. It is also possible to create a differently named PVC, that mounts `/mnt/flink/storage` and mount it using a configuration file, as described in xref:develop:cloudflow-configuration.adoc[The Configuration Model]. The access mode for the PVC has to be `ReadWriteMany`.

Flink's runtime encodes all state and writes them into checkpoints. And since checkpoints in Flink offer an exactly-once guarantee of semantics, all application state are preserved safely throughout the lifetime of the streamlet. In case of failures, recovery will be done from checkpoints, and there will be no data loss.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ This includes stateful processing, from time-based aggregations to arbitrary sta
All stateful processing relies on snapshots and a state store for the bookkeeping of the offsets processed and the computed state at any time.
In Cloudflow, this state store is deployed on Kubernetes using _Persistent Volume Claims_ (PVCs) backed by a storage class that allows for _access mode_ `ReadWriteMany`.

PVCs are automatically provisioned for each Cloudflow application.
The volumes are claimed for as long as the Pipeline application is deployed, allowing for seamless re-deployment, upgrades, and recovery in case of failure.
To deploy a Spark application it is necessary that a PVC already exists in the namespace for the application. Cloudflow will automatically mount this PVC in the streamlets themselves to allow state storage, as long as it has a `/mnt/spark/storage` mount path. Cloudflow will look for a PVC named `cloudflow-spark` by default that mounts this path. It is also possible to create a differently named PVC, that mounts `/mnt/spark/storage` and mount it using a configuration file, as described in xref:develop:cloudflow-configuration.adoc[The Configuration Model]. The access mode for the PVC has to be `ReadWriteMany`.

We can use the managed storage to safely store checkpoint data.
Checkpoint information is managed by Cloudflow for all streamlets that use the `context.writeStream` method.
Expand Down
Loading

0 comments on commit e2e0b9a

Please sign in to comment.