From 5cda6b228d5cebb6a51246db7d06143d76fc627d Mon Sep 17 00:00:00 2001 From: David Byron Date: Mon, 5 Feb 2024 14:58:26 -0800 Subject: [PATCH] feat(sql): log the size of executions when they complete to make changes in size over time more observable. This makes it easier to see the impact of features like the artifact store. Not setting the size in RedisExecutionRepository because the extra objectMapper.writeValueAsString is potentially expensive and I'm not sure how many folks are using redis for this. --- .../pipeline/models/PipelineExecution.java | 15 ++++ .../api/pipeline/models/StageExecution.java | 8 ++ .../pipeline/model/PipelineExecutionImpl.java | 68 ++++++++++++++++ .../pipeline/model/StageExecutionImpl.java | 12 +++ .../model/PipelineExecutionImplTest.java | 77 +++++++++++++++++++ .../q/handler/CompleteExecutionHandler.kt | 10 +++ .../pipeline/persistence/ExecutionMapper.kt | 5 +- .../persistence/SqlExecutionRepository.kt | 7 ++ .../persistence/SqlExecutionRepositoryTest.kt | 38 +++++++++ 9 files changed, 239 insertions(+), 1 deletion(-) create mode 100644 orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImplTest.java diff --git a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/PipelineExecution.java b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/PipelineExecution.java index 72fb480dcd..d7ba723c1c 100644 --- a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/PipelineExecution.java +++ b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/PipelineExecution.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -137,6 +138,20 @@ public interface PipelineExecution { void setPartition(String partition); + /** + * The size of the pipeline execution (not including stages) in units appropriate for the + * implementation (e.g. characters/bytes), if available. + */ + Optional getSize(); + + void setSize(long size); + + /** + * The total size of the pipeline execution including stages in units appropriate for the + * implementation (e.g. characters/bytes), if available. + */ + Optional getTotalSize(); + // ------- StageExecution namedStage(String type); diff --git a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java index 94c42db167..e8692bbc83 100644 --- a/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java +++ b/orca-api/src/main/java/com/netflix/spinnaker/orca/api/pipeline/models/StageExecution.java @@ -128,6 +128,14 @@ public interface StageExecution { void setAdditionalMetricTags(Map additionalMetricTags); + /** + * The size of the stage execution in units appropriate for the implementation (e.g. + * characters/bytes), if available. + */ + Optional getSize(); + + void setSize(long size); + // ------------- InternalStageExecution? // A lot of these methods are used in a single place somewhere in Orca. I don't know why we // decided to put a bunch diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImpl.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImpl.java index 34430668f3..1748ab315e 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImpl.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImpl.java @@ -46,7 +46,9 @@ import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class PipelineExecutionImpl implements PipelineExecution, Serializable { public static final DefaultTrigger NO_TRIGGER = new DefaultTrigger("none"); @@ -381,6 +383,72 @@ public void setPartition(@Nullable String partition) { return this.partition; } + @JsonIgnore private Long size = null; + + @Override + public Optional getSize() { + return Optional.ofNullable(this.size); + } + + @Override + public void setSize(long size) { + this.size = size; + } + + @JsonIgnore + @Override + public Optional getTotalSize() { + // Calculate the total size of the execution. Since this is called from + // multiple places, don't assume any of the info is present. + // + // If any of the info is missing, return null rather than potentially + // misleading / too small information. + Optional thisSize = this.getSize(); + if (thisSize.isEmpty()) { + log.debug( + "getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, no size", + this.getApplication(), + this.getName(), + this.getPipelineConfigId(), + this.getId()); + return Optional.empty(); + } + + // See if any stage is missing info + for (StageExecution stage : this.getStages()) { + if (stage.getSize().isEmpty()) { + log.debug( + "getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, stage name: {}, stage id: {}, no size", + this.getApplication(), + this.getName(), + this.getPipelineConfigId(), + this.getId(), + stage.getName(), + stage.getId()); + return Optional.empty(); + } + } + + long totalSize = + thisSize.get() + + this.getStages().stream() + .mapToLong( + (StageExecution stage) -> { + return stage.getSize().get(); + }) + .sum(); + + log.debug( + "getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, total execution size: {}", + this.getApplication(), + this.getName(), + this.getPipelineConfigId(), + this.getId(), + totalSize); + + return Optional.of(totalSize); + } + @Nullable public StageExecution namedStage(String type) { return stages.stream().filter(it -> it.getType().equals(type)).findFirst().orElse(null); diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java index 276725ccf1..16595f9b3f 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/StageExecutionImpl.java @@ -324,6 +324,18 @@ public void setScheduledTime(@Nullable Long scheduledTime) { private LastModifiedDetails lastModified; + @JsonIgnore private Long size = null; + + @Override + public Optional getSize() { + return Optional.ofNullable(this.size); + } + + @Override + public void setSize(long size) { + this.size = size; + } + @Nullable @Override public StageExecution.LastModifiedDetails getLastModified() { diff --git a/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImplTest.java b/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImplTest.java new file mode 100644 index 0000000000..629b043287 --- /dev/null +++ b/orca-core/src/test/java/com/netflix/spinnaker/orca/pipeline/model/PipelineExecutionImplTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2024 Salesforce, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.pipeline.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class PipelineExecutionImplTest { + + private PipelineExecutionImpl pipelineExecution; + + private StageExecutionImpl stageExecution; + + @BeforeEach + void setup() { + pipelineExecution = new PipelineExecutionImpl(ExecutionType.PIPELINE, "test-application"); + stageExecution = new StageExecutionImpl(); + stageExecution.setExecution(pipelineExecution); + pipelineExecution.getStages().add(stageExecution); + } + + @Test + void getTotalSizeMissingPipelineSize() { + // given + assertThat(pipelineExecution.getSize()).isEmpty(); + + // then + assertThat(pipelineExecution.getTotalSize()).isEmpty(); + } + + @Test + void getTotalSizeMissingStageSize() { + // given + long pipelineSize = 14; // arbitrary + + pipelineExecution.setSize(pipelineSize); + assertThat(pipelineExecution.getSize()).isPresent(); + + assertThat(stageExecution.getSize()).isEmpty(); + + // then + assertThat(pipelineExecution.getTotalSize()).isEmpty(); + } + + @Test + void getTotalSizeCompleteInfo() { + // given + long pipelineSize = 5; // arbitrary + long stageSize = 7; // arbitrary + + pipelineExecution.setSize(pipelineSize); + assertThat(pipelineExecution.getSize()).isPresent(); + + stageExecution.setSize(stageSize); + assertThat(stageExecution.getSize()).isPresent(); + + // then + assertThat(pipelineExecution.getTotalSize().get()).isEqualTo(pipelineSize + stageSize); + } +} diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandler.kt index 6dab3de24b..02d5372460 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteExecutionHandler.kt @@ -36,6 +36,7 @@ import com.netflix.spinnaker.orca.q.CompleteExecution import com.netflix.spinnaker.orca.q.StartWaitingExecutions import com.netflix.spinnaker.q.AttemptsAttribute import com.netflix.spinnaker.q.Queue +import net.logstash.logback.argument.StructuredArguments.kv import java.time.Duration import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier @@ -64,6 +65,15 @@ class CompleteExecutionHandler( message.determineFinalStatus(execution) { status -> execution.updateStatus(status) repository.updateStatus(execution) + val executionContextSize = execution.getTotalSize() + if (executionContextSize.isPresent) { + log.info("completed pipeline execution size: {},{},{},{},{}", + kv("application", execution.application), + kv("pipelineName", execution.name), + kv("pipelineConfigId", execution.pipelineConfigId), + kv("pipelineExecutionId", execution.id), + kv("size", executionContextSize.get())) + } publisher.publishEvent(ExecutionComplete(this, execution)) registry.counter( diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt index cce03cfdab..ebe5d3abc2 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt @@ -77,6 +77,7 @@ class ExecutionMapper( mapper.readValue(body) .also { execution -> + execution.setSize(body.length.toLong()) results.add(execution) execution.partition = rs.getString("partition") @@ -120,12 +121,14 @@ class ExecutionMapper( private fun mapStage(rs: ResultSet, executions: Map) { val executionId = rs.getString("execution_id") + val body = getDecompressedBody(rs) executions.getValue(executionId) .stages .add( - mapper.readValue(getDecompressedBody(rs)) + mapper.readValue(body) .apply { execution = executions.getValue(executionId) + setSize(body.length.toLong()) } ) } diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 7183b06011..33575f5d30 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -789,6 +789,9 @@ class SqlExecutionRepository( val stageTableName = execution.type.stagesTableName val status = execution.status.toString() val body = mapper.writeValueAsString(execution) + val bodySize = body.length.toLong() + execution.setSize(bodySize) + log.debug("application ${execution.application}, pipeline name: ${execution.name}, pipeline config id ${execution.pipelineConfigId}, pipeline execution id ${execution.id}, execution size: ${bodySize}") val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime) @@ -878,6 +881,10 @@ class SqlExecutionRepository( val stageTable = stage.execution.type.stagesTableName val table = stage.execution.type.tableName val body = mapper.writeValueAsString(stage) + val bodySize = body.length.toLong() + stage.setSize(bodySize) + log.debug("application ${stage.execution.application}, pipeline name: ${stage.execution.name}, pipeline config id ${stage.execution.pipelineConfigId}, pipeline execution id ${stage.execution.id}, stage name: ${stage.name}, stage id: ${stage.id}, size: ${bodySize}") + val buildTime = stage.execution.buildTime val executionUlid = executionId ?: mapLegacyId(ctx, table, stage.execution.id, buildTime).first diff --git a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt index e48209448d..9d03c3c7af 100644 --- a/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt +++ b/orca-sql/src/test/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositoryTest.kt @@ -170,8 +170,41 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { } test("store and retrieve with compression disabled") { + // SqlExecutionRepository.storeExecutionInternal serializes stages + // separately, so do the same here to calculate expected sizes + val beforeStages = pipelineExecution.stages.toList() + pipelineExecution.stages.clear() + val beforePipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution) + pipelineExecution.stages.addAll(beforeStages) + val beforePipelineExecutionSize = beforePipelineString.length.toLong() + val beforeStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single()) + val beforeStageSize = beforeStageString.length.toLong() + val beforeTotalSize = beforePipelineExecutionSize + beforeStageSize + sqlExecutionRepositoryNoCompression.store(pipelineExecution) + val afterStages = pipelineExecution.stages.toList() + pipelineExecution.stages.clear() + val expectedPipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution) + pipelineExecution.stages.addAll(afterStages) + val expectedPipelineExecutionSize = expectedPipelineString.length.toLong() + val expectedStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single()) + val expectedStageSize = expectedStageString.length.toLong() + val expectedTotalSize = expectedPipelineExecutionSize + expectedStageSize + + // Make sure the act of storing the pipeline didn't change the + // serialization (e.g. that the size attributes don't get serialized). + assertThat(beforePipelineString).isEqualTo(expectedPipelineString) + assertThat(beforeStageString).isEqualTo(expectedStageString) + assertThat(beforePipelineExecutionSize).isEqualTo(expectedPipelineExecutionSize); + assertThat(beforeStageSize).isEqualTo(expectedStageSize); + assertThat(beforeTotalSize).isEqualTo(expectedTotalSize); + + // And make sure the size is correct + assertThat(pipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize) + assertThat(pipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize) + assertThat(pipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize) + val numCompressedExecutions = database.context.fetchCount(testTable.compressedExecTable) assertThat(numCompressedExecutions).isEqualTo(0) @@ -186,6 +219,11 @@ class SqlExecutionRepositoryTest : JUnit5Minutests { val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineId) assertThat(actualPipelineExecution).isEqualTo(pipelineExecution) + + // Make sure is calculated on retrieve as well + assertThat(actualPipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize) + assertThat(actualPipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize) + assertThat(actualPipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize) } test("store compressed, retrieve with compression disabled") {