From 688b570594e59c64c8257b38925f079cd824cbe8 Mon Sep 17 00:00:00 2001 From: tsc Date: Wed, 13 Jan 2021 00:34:36 +0100 Subject: [PATCH 1/6] Drop unnecessary object name conversion --- .../src/main/scala/cloudflow/operator/action/Name.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala index c68a6c602..9d6377e1d 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/Name.scala @@ -125,7 +125,7 @@ object Name { makeDNS1039Compatible(fixDots(streamletDeploymentName)) def ofConfigMap(streamletDeploymentName: String) = - makeDNS1123CompatibleSubDomainName(s"configmap-${fixDots(streamletDeploymentName)}") + makeDNS1123CompatibleSubDomainName(s"configmap-${fixDots(streamletDeploymentName)}") // TODO - why append '-service' (line 145) but prepend 'configmap-'? - suffixes seem the more k8s-idiomatic choice def ofLabelValue(name: String) = truncateTo63Characters(name) @@ -142,7 +142,7 @@ object Name { val ofContainerPrometheusExporterPort = max15Chars("prom-metrics") def ofService(streamletDeploymentName: String) = - truncateTo63CharactersWithSuffix(makeDNS1039Compatible(ofPod(streamletDeploymentName)), "-service") + truncateTo63CharactersWithSuffix(ofPod(streamletDeploymentName), "-service") def ofAdminService(streamletDeploymentName: String) = s"${ofPod(streamletDeploymentName)}-admin-service" From 8dfedf9fb2cf93f678cb8dedd0e463607b73b9c5 Mon Sep 17 00:00:00 2001 From: tsc Date: Wed, 13 Jan 2021 00:58:26 +0100 Subject: [PATCH 2/6] Avoid code duplication for building k8s service-name based on app-id and streamlet-name --- .../cloudflow/operator/action/EndpointActions.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala index c3cafa537..8b1ccbe7b 100644 --- a/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala +++ b/core/cloudflow-operator-actions/src/main/scala/cloudflow/operator/action/EndpointActions.scala @@ -40,17 +40,25 @@ object EndpointActions { def distinctEndpoints(app: CloudflowApplication.Spec) = app.deployments.flatMap(deployment => deployment.endpoint).toSet + def deploymentsOfEndpoints(app: CloudflowApplication.Spec) = + app.deployments + .flatMap(deployment => deployment.endpoint.map(ep => ep -> deployment.name)) + .toMap // use deployment name rather than re-constructing the name again (by duplicating code) + + val currentDeploymentsOfEndpoints = currentApp.map(current => deploymentsOfEndpoints(current.spec)).getOrElse(Map.empty) + val newDeploymentsOfEndpoints = deploymentsOfEndpoints(newApp.spec) + val currentEndpoints = currentApp.map(cr => distinctEndpoints(cr.spec)).getOrElse(Set.empty[Endpoint]) val newEndpoints = distinctEndpoints(newApp.spec) val deleteActions = (currentEndpoints -- newEndpoints).flatMap { endpoint => Seq( - Action.delete[Service](Name.ofService(StreamletDeployment.name(newApp.spec.appId, endpoint.streamlet)), newApp.namespace) + Action.delete[Service](Name.ofService(currentDeploymentsOfEndpoints(endpoint)), newApp.namespace) ) }.toList val createActions = (newEndpoints -- currentEndpoints).flatMap { endpoint => Seq( - createServiceAction(endpoint, newApp, StreamletDeployment.name(newApp.spec.appId, endpoint.streamlet)) + createServiceAction(endpoint, newApp, newDeploymentsOfEndpoints(endpoint)) ) }.toList deleteActions ++ createActions From 47289fdc52753028145419208e5ec8fedb7c2a3f Mon Sep 17 00:00:00 2001 From: tsc Date: Fri, 21 May 2021 08:35:33 +0200 Subject: [PATCH 3/6] Add specs for reproducing unexpected behavior --- core/build.sbt | 3 +- .../src/test/avro/TestData.avsc | 14 +++ .../akkastream/testkit/ReproduceErrors.scala | 117 ++++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 core/cloudflow-akka-testkit/src/test/avro/TestData.avsc create mode 100644 core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala diff --git a/core/build.sbt b/core/build.sbt index 8af15b078..9f818c538 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -196,7 +196,8 @@ lazy val akkastreamTestkit = ) .settings( javacOptions += "-Xlint:deprecation", - javacOptions += "-Xlint:unchecked" + javacOptions += "-Xlint:unchecked", + (sourceGenerators in Test) += (avroScalaGenerateSpecific in Test).taskValue ) lazy val akkastreamTests = diff --git a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc new file mode 100644 index 000000000..f0d20b850 --- /dev/null +++ b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc @@ -0,0 +1,14 @@ +{ + "namespace": "cloudflow.akkastream.testdata", + "type": "record", + "name": "TestData", + "fields":[ + { + "name": "id", "type": "int" + }, + { + "name": "name", "type": "string" + } + ] +} + diff --git a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala new file mode 100644 index 000000000..9573ac6b0 --- /dev/null +++ b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloudflow.akkastream.testkit + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ RunnableGraph, Source } +import akka.testkit.TestKit +import cloudflow.akkastream.{ AkkaStreamlet, AkkaStreamletLogic } +import cloudflow.akkastream.scaladsl.RunnableGraphStreamletLogic +import cloudflow.streamlets.StreamletShape +import cloudflow.akkastream.testdata.TestData +import cloudflow.akkastream.testkit.scaladsl.{ AkkaStreamletTestKit, Completed } +import cloudflow.streamlets.avro.{ AvroInlet, AvroOutlet } +import org.scalatest._ + +import scala.concurrent.Future + +class ReproduceErrors extends WordSpec with MustMatchers with BeforeAndAfterAll { + private implicit val system = ActorSystem("CloudflowAkkaTestkitErrorReproducerSpec") + + override def afterAll: Unit = + TestKit.shutdownActorSystem(system) + + object TestFixture { + val msgs = List.tabulate(10)(i => TestData(i, i.toString)) + + class TestStreamlet extends AkkaStreamlet { + val in = AvroInlet[TestData]("in") + val out = AvroOutlet[TestData]("out") + + override val shape: StreamletShape = StreamletShape(in).withOutlets(out) + + override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() { + override def runnableGraph(): RunnableGraph[_] = { + val write = sinkRef(out).write _ + sourceWithCommittableContext(in) + .mapAsync(parallelism = 1) { element => + Future { + write(element) + } + } + .to(committableSink) + } + } + } + } + + import TestFixture._ + + "Cloudlfow Akka TestKit" should { + "emit a dedicated completed messages after each message emitted via sinkRef.write, but should not" in { + val testkit = AkkaStreamletTestKit(system) + val s = new TestStreamlet() + val in = testkit.inletFromSource(s.in, Source(msgs)) + val out = testkit.outletAsTap(s.out) + + testkit.run( + s, + List(in), + List(out), + () => { + val gotAll = out.probe.receiveN(msgs.size * 2) + val grouped = gotAll.groupBy { + case _: Completed => Completed + case _ => TestData + } + + grouped(TestData) must have size msgs.size + + val resultWithoutIndex = grouped(TestData).asInstanceOf[Seq[(_, TestData)]].map(_._2) + resultWithoutIndex must contain allElementsOf msgs + + grouped(Completed) must have size 1 // but is actually of size msgs.size + } + ) + } + + (0 until 300).foreach { i => + s"maintain the order in which messages are emitted via sinkRef.write (run #$i), but should not" in { + val testkit = AkkaStreamletTestKit(system) + val s = new TestStreamlet() + val in = testkit.inletFromSource(s.in, Source(msgs)) + val out = testkit.outletAsTap(s.out) + + testkit.run( + s, + List(in), + List(out), + () => { + val got = out.probe + .receiveN(msgs.size * 2) + .filter(_ != Completed) // compensate for Completed msg being published after each msg as in upper test case + .map { + case (_, m: TestData) => m + } + got mustEqual msgs + } + ) + } + } + } + +} From c9ad68d4f7082f698fe1c02390a2604e06d16df7 Mon Sep 17 00:00:00 2001 From: tsc Date: Fri, 21 May 2021 09:16:03 +0200 Subject: [PATCH 4/6] Revert "Add specs for reproducing unexpected behavior" This reverts commit 47289fdc52753028145419208e5ec8fedb7c2a3f. --- core/build.sbt | 3 +- .../src/test/avro/TestData.avsc | 14 --- .../akkastream/testkit/ReproduceErrors.scala | 117 ------------------ 3 files changed, 1 insertion(+), 133 deletions(-) delete mode 100644 core/cloudflow-akka-testkit/src/test/avro/TestData.avsc delete mode 100644 core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala diff --git a/core/build.sbt b/core/build.sbt index 9f818c538..8af15b078 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -196,8 +196,7 @@ lazy val akkastreamTestkit = ) .settings( javacOptions += "-Xlint:deprecation", - javacOptions += "-Xlint:unchecked", - (sourceGenerators in Test) += (avroScalaGenerateSpecific in Test).taskValue + javacOptions += "-Xlint:unchecked" ) lazy val akkastreamTests = diff --git a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc b/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc deleted file mode 100644 index f0d20b850..000000000 --- a/core/cloudflow-akka-testkit/src/test/avro/TestData.avsc +++ /dev/null @@ -1,14 +0,0 @@ -{ - "namespace": "cloudflow.akkastream.testdata", - "type": "record", - "name": "TestData", - "fields":[ - { - "name": "id", "type": "int" - }, - { - "name": "name", "type": "string" - } - ] -} - diff --git a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala b/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala deleted file mode 100644 index 9573ac6b0..000000000 --- a/core/cloudflow-akka-testkit/src/test/scala/cloudflow/akkastream/testkit/ReproduceErrors.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2016-2020 Lightbend Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloudflow.akkastream.testkit - -import akka.actor.ActorSystem -import akka.stream.scaladsl.{ RunnableGraph, Source } -import akka.testkit.TestKit -import cloudflow.akkastream.{ AkkaStreamlet, AkkaStreamletLogic } -import cloudflow.akkastream.scaladsl.RunnableGraphStreamletLogic -import cloudflow.streamlets.StreamletShape -import cloudflow.akkastream.testdata.TestData -import cloudflow.akkastream.testkit.scaladsl.{ AkkaStreamletTestKit, Completed } -import cloudflow.streamlets.avro.{ AvroInlet, AvroOutlet } -import org.scalatest._ - -import scala.concurrent.Future - -class ReproduceErrors extends WordSpec with MustMatchers with BeforeAndAfterAll { - private implicit val system = ActorSystem("CloudflowAkkaTestkitErrorReproducerSpec") - - override def afterAll: Unit = - TestKit.shutdownActorSystem(system) - - object TestFixture { - val msgs = List.tabulate(10)(i => TestData(i, i.toString)) - - class TestStreamlet extends AkkaStreamlet { - val in = AvroInlet[TestData]("in") - val out = AvroOutlet[TestData]("out") - - override val shape: StreamletShape = StreamletShape(in).withOutlets(out) - - override protected def createLogic(): AkkaStreamletLogic = new RunnableGraphStreamletLogic() { - override def runnableGraph(): RunnableGraph[_] = { - val write = sinkRef(out).write _ - sourceWithCommittableContext(in) - .mapAsync(parallelism = 1) { element => - Future { - write(element) - } - } - .to(committableSink) - } - } - } - } - - import TestFixture._ - - "Cloudlfow Akka TestKit" should { - "emit a dedicated completed messages after each message emitted via sinkRef.write, but should not" in { - val testkit = AkkaStreamletTestKit(system) - val s = new TestStreamlet() - val in = testkit.inletFromSource(s.in, Source(msgs)) - val out = testkit.outletAsTap(s.out) - - testkit.run( - s, - List(in), - List(out), - () => { - val gotAll = out.probe.receiveN(msgs.size * 2) - val grouped = gotAll.groupBy { - case _: Completed => Completed - case _ => TestData - } - - grouped(TestData) must have size msgs.size - - val resultWithoutIndex = grouped(TestData).asInstanceOf[Seq[(_, TestData)]].map(_._2) - resultWithoutIndex must contain allElementsOf msgs - - grouped(Completed) must have size 1 // but is actually of size msgs.size - } - ) - } - - (0 until 300).foreach { i => - s"maintain the order in which messages are emitted via sinkRef.write (run #$i), but should not" in { - val testkit = AkkaStreamletTestKit(system) - val s = new TestStreamlet() - val in = testkit.inletFromSource(s.in, Source(msgs)) - val out = testkit.outletAsTap(s.out) - - testkit.run( - s, - List(in), - List(out), - () => { - val got = out.probe - .receiveN(msgs.size * 2) - .filter(_ != Completed) // compensate for Completed msg being published after each msg as in upper test case - .map { - case (_, m: TestData) => m - } - got mustEqual msgs - } - ) - } - } - } - -} From becfaaca01292d4475f73fe87f2eb5d6a4e44fa7 Mon Sep 17 00:00:00 2001 From: tsc Date: Sat, 2 Jul 2022 02:41:25 +0200 Subject: [PATCH 5/6] Try to get rid of CRDs in version v1beta1 --- .../kubeclient/KubeClientFabric8.scala | 16 ------------- .../kubeclient/Fabric8KubeClientSpec.scala | 2 +- .../src/main/scala/akka/datap/crd/App.scala | 23 ++++++++++++------- .../main/scala/cloudflow/operator/Main.scala | 4 ++-- .../main/scala/cli/CodepathCoverageMain.scala | 2 +- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/core/cloudflow-cli/src/main/scala/akka/cli/cloudflow/kubeclient/KubeClientFabric8.scala b/core/cloudflow-cli/src/main/scala/akka/cli/cloudflow/kubeclient/KubeClientFabric8.scala index eef084983..1b4b7ef21 100644 --- a/core/cloudflow-cli/src/main/scala/akka/cli/cloudflow/kubeclient/KubeClientFabric8.scala +++ b/core/cloudflow-cli/src/main/scala/akka/cli/cloudflow/kubeclient/KubeClientFabric8.scala @@ -100,22 +100,6 @@ class KubeClientFabric8( } } - private def getCrd(name: String, client: KubernetesClient) = { - client - .apiextensions() - .v1beta1() - .customResourceDefinitions() - .inAnyNamespace() - .list() - .getItems() - .asScala - .find { crd => - val crdName = crd.getMetadata.getName - logger.trace(s"Scanning Custom Resources found: ${name}") - crdName == name - } - } - private def getCloudflowApplicationsClient(client: KubernetesClient) = Try { val cloudflowClient = { diff --git a/core/cloudflow-cli/src/test/scala/akka/cli/cloudflow/kubeclient/Fabric8KubeClientSpec.scala b/core/cloudflow-cli/src/test/scala/akka/cli/cloudflow/kubeclient/Fabric8KubeClientSpec.scala index 0ce2f1b44..b45e2f514 100644 --- a/core/cloudflow-cli/src/test/scala/akka/cli/cloudflow/kubeclient/Fabric8KubeClientSpec.scala +++ b/core/cloudflow-cli/src/test/scala/akka/cli/cloudflow/kubeclient/Fabric8KubeClientSpec.scala @@ -28,7 +28,7 @@ class Fabric8KubeClientSpec extends AnyFlatSpec with Matchers with BeforeAndAfte .mkString("\n") server.expect.get - .withPath("/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions") + .withPath("/apis/apiextensions.k8s.io/v1/customresourcedefinitions") .andReturn( HttpURLConnection.HTTP_OK, Source diff --git a/core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala b/core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala index 2c7e8fdb3..aee45be49 100644 --- a/core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala +++ b/core/cloudflow-crd/src/main/scala/akka/datap/crd/App.scala @@ -13,14 +13,16 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.JsonDeserializer import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import io.fabric8.kubernetes.api.model.apiextensions.v1beta1.CustomResourceDefinitionBuilder +import io.fabric8.kubernetes.api.model.apiextensions.v1.{ + CustomResourceDefinitionBuilder, + CustomResourceDefinitionVersion, + CustomResourceDefinitionVersionBuilder +} import io.fabric8.kubernetes.api.model.{ KubernetesResource, Namespaced, ObjectMeta } import io.fabric8.kubernetes.client.{ CustomResource, CustomResourceList } import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext import io.fabric8.kubernetes.model.annotation.{ Group, Kind, Plural, Version } -import scala.util.Try - object App { // GroupName for our CR @@ -50,6 +52,15 @@ object App { final val ProtocolVersionKey = "protocol-version" final val ProtocolVersion = "7" + val customResourceDefinitionVersion: CustomResourceDefinitionVersion = { + new CustomResourceDefinitionVersionBuilder() + .withName(GroupVersion) + .withNewSubresources() + .withNewStatus() + .endStatus() + .endSubresources() + .build() + } val customResourceDefinitionContext: CustomResourceDefinitionContext = new CustomResourceDefinitionContext.Builder() .withVersion(GroupVersion) @@ -73,13 +84,9 @@ object App { .withPlural(Plural) .withShortNames(Short) .endNames() - .withVersion(GroupVersion) + .withVersions(customResourceDefinitionVersion) .withScope("Namespaced") .withPreserveUnknownFields(true) - .withNewSubresources() - .withNewStatus() - .endStatus() - .endSubresources() .endSpec() .withNewStatus() .withStoredVersions(GroupVersion) diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala index 5e21120fe..f202de3e9 100644 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala +++ b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala @@ -129,11 +129,11 @@ object Main extends { Option( client .apiextensions() - .v1beta1() + .v1() .customResourceDefinitions() .withName(App.ResourceName) .get()) match { - case Some(crd) if crd.getSpec.getVersion == App.GroupVersion => + case Some(crd) if crd.getSpec.getVersions().contains(App.GroupVersion) => system.log.info(s"CRD found at version ${App.GroupVersion}") case _ => system.log.error( diff --git a/core/tooling/src/main/scala/cli/CodepathCoverageMain.scala b/core/tooling/src/main/scala/cli/CodepathCoverageMain.scala index 3d1285c27..52c089c6c 100644 --- a/core/tooling/src/main/scala/cli/CodepathCoverageMain.scala +++ b/core/tooling/src/main/scala/cli/CodepathCoverageMain.scala @@ -143,7 +143,7 @@ object CodepathCoverageMain extends App { Serialization .jsonMapper() - .readValue("{}", classOf[io.fabric8.kubernetes.api.model.apiextensions.v1beta1.CustomResourceDefinitionList]) + .readValue("{}", classOf[io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionList]) Serialization .jsonMapper() From 340d5f8e6116ead083c04b6b9008ebbd9e01654e Mon Sep 17 00:00:00 2001 From: tsc Date: Tue, 5 Jul 2022 05:37:27 +0200 Subject: [PATCH 6/6] Fix incorrect check whether version is contained --- .../src/main/scala/cloudflow/operator/Main.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala index f202de3e9..4a1b95355 100644 --- a/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala +++ b/core/cloudflow-operator/src/main/scala/cloudflow/operator/Main.scala @@ -133,7 +133,7 @@ object Main extends { .customResourceDefinitions() .withName(App.ResourceName) .get()) match { - case Some(crd) if crd.getSpec.getVersions().contains(App.GroupVersion) => + case Some(crd) if crd.getSpec.getVersions().asScala.exists(_.getName == App.GroupVersion) => system.log.info(s"CRD found at version ${App.GroupVersion}") case _ => system.log.error(