Skip to content

Commit

Permalink
feat(sql): log the size of executions when they complete
Browse files Browse the repository at this point in the history
to make changes in size over time more observable.  This makes it easier to see the impact
of features like the artifact store.

Not setting the size in RedisExecutionRepository because the extra
objectMapper.writeValueAsString is potentially expensive and I'm not sure how many folks
are using redis for this.
  • Loading branch information
dbyron-sf committed Apr 15, 2024
1 parent fc072b3 commit 5cda6b2
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -137,6 +138,20 @@ public interface PipelineExecution {

void setPartition(String partition);

/**
* The size of the pipeline execution (not including stages) in units appropriate for the
* implementation (e.g. characters/bytes), if available.
*/
Optional<Long> getSize();

void setSize(long size);

/**
* The total size of the pipeline execution including stages in units appropriate for the
* implementation (e.g. characters/bytes), if available.
*/
Optional<Long> getTotalSize();

// -------

StageExecution namedStage(String type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public interface StageExecution {

void setAdditionalMetricTags(Map<String, String> additionalMetricTags);

/**
* The size of the stage execution in units appropriate for the implementation (e.g.
* characters/bytes), if available.
*/
Optional<Long> getSize();

void setSize(long size);

// ------------- InternalStageExecution?
// A lot of these methods are used in a single place somewhere in Orca. I don't know why we
// decided to put a bunch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PipelineExecutionImpl implements PipelineExecution, Serializable {

public static final DefaultTrigger NO_TRIGGER = new DefaultTrigger("none");
Expand Down Expand Up @@ -381,6 +383,72 @@ public void setPartition(@Nullable String partition) {
return this.partition;
}

@JsonIgnore private Long size = null;

@Override
public Optional<Long> getSize() {
return Optional.ofNullable(this.size);
}

@Override
public void setSize(long size) {
this.size = size;
}

@JsonIgnore
@Override
public Optional<Long> getTotalSize() {
// Calculate the total size of the execution. Since this is called from
// multiple places, don't assume any of the info is present.
//
// If any of the info is missing, return null rather than potentially
// misleading / too small information.
Optional<Long> thisSize = this.getSize();
if (thisSize.isEmpty()) {
log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, no size",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId());
return Optional.empty();
}

// See if any stage is missing info
for (StageExecution stage : this.getStages()) {
if (stage.getSize().isEmpty()) {
log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, stage name: {}, stage id: {}, no size",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId(),
stage.getName(),
stage.getId());
return Optional.empty();
}
}

long totalSize =
thisSize.get()
+ this.getStages().stream()
.mapToLong(
(StageExecution stage) -> {
return stage.getSize().get();
})
.sum();

log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, total execution size: {}",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId(),
totalSize);

return Optional.of(totalSize);
}

@Nullable
public StageExecution namedStage(String type) {
return stages.stream().filter(it -> it.getType().equals(type)).findFirst().orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ public void setScheduledTime(@Nullable Long scheduledTime) {

private LastModifiedDetails lastModified;

@JsonIgnore private Long size = null;

@Override
public Optional<Long> getSize() {
return Optional.ofNullable(this.size);
}

@Override
public void setSize(long size) {
this.size = size;
}

@Nullable
@Override
public StageExecution.LastModifiedDetails getLastModified() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.pipeline.model;

import static org.assertj.core.api.Assertions.assertThat;

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PipelineExecutionImplTest {

private PipelineExecutionImpl pipelineExecution;

private StageExecutionImpl stageExecution;

@BeforeEach
void setup() {
pipelineExecution = new PipelineExecutionImpl(ExecutionType.PIPELINE, "test-application");
stageExecution = new StageExecutionImpl();
stageExecution.setExecution(pipelineExecution);
pipelineExecution.getStages().add(stageExecution);
}

@Test
void getTotalSizeMissingPipelineSize() {
// given
assertThat(pipelineExecution.getSize()).isEmpty();

// then
assertThat(pipelineExecution.getTotalSize()).isEmpty();
}

@Test
void getTotalSizeMissingStageSize() {
// given
long pipelineSize = 14; // arbitrary

pipelineExecution.setSize(pipelineSize);
assertThat(pipelineExecution.getSize()).isPresent();

assertThat(stageExecution.getSize()).isEmpty();

// then
assertThat(pipelineExecution.getTotalSize()).isEmpty();
}

@Test
void getTotalSizeCompleteInfo() {
// given
long pipelineSize = 5; // arbitrary
long stageSize = 7; // arbitrary

pipelineExecution.setSize(pipelineSize);
assertThat(pipelineExecution.getSize()).isPresent();

stageExecution.setSize(stageSize);
assertThat(stageExecution.getSize()).isPresent();

// then
assertThat(pipelineExecution.getTotalSize().get()).isEqualTo(pipelineSize + stageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.netflix.spinnaker.orca.q.CompleteExecution
import com.netflix.spinnaker.orca.q.StartWaitingExecutions
import com.netflix.spinnaker.q.AttemptsAttribute
import com.netflix.spinnaker.q.Queue
import net.logstash.logback.argument.StructuredArguments.kv
import java.time.Duration
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -64,6 +65,15 @@ class CompleteExecutionHandler(
message.determineFinalStatus(execution) { status ->
execution.updateStatus(status)
repository.updateStatus(execution)
val executionContextSize = execution.getTotalSize()
if (executionContextSize.isPresent) {
log.info("completed pipeline execution size: {},{},{},{},{}",
kv("application", execution.application),
kv("pipelineName", execution.name),
kv("pipelineConfigId", execution.pipelineConfigId),
kv("pipelineExecutionId", execution.id),
kv("size", executionContextSize.get()))
}
publisher.publishEvent(ExecutionComplete(this, execution))

registry.counter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ExecutionMapper(
mapper.readValue<PipelineExecution>(body)
.also {
execution ->
execution.setSize(body.length.toLong())
results.add(execution)
execution.partition = rs.getString("partition")

Expand Down Expand Up @@ -120,12 +121,14 @@ class ExecutionMapper(

private fun mapStage(rs: ResultSet, executions: Map<String, PipelineExecution>) {
val executionId = rs.getString("execution_id")
val body = getDecompressedBody(rs)
executions.getValue(executionId)
.stages
.add(
mapper.readValue<StageExecution>(getDecompressedBody(rs))
mapper.readValue<StageExecution>(body)
.apply {
execution = executions.getValue(executionId)
setSize(body.length.toLong())
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ class SqlExecutionRepository(
val stageTableName = execution.type.stagesTableName
val status = execution.status.toString()
val body = mapper.writeValueAsString(execution)
val bodySize = body.length.toLong()
execution.setSize(bodySize)
log.debug("application ${execution.application}, pipeline name: ${execution.name}, pipeline config id ${execution.pipelineConfigId}, pipeline execution id ${execution.id}, execution size: ${bodySize}")

val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime)

Expand Down Expand Up @@ -878,6 +881,10 @@ class SqlExecutionRepository(
val stageTable = stage.execution.type.stagesTableName
val table = stage.execution.type.tableName
val body = mapper.writeValueAsString(stage)
val bodySize = body.length.toLong()
stage.setSize(bodySize)
log.debug("application ${stage.execution.application}, pipeline name: ${stage.execution.name}, pipeline config id ${stage.execution.pipelineConfigId}, pipeline execution id ${stage.execution.id}, stage name: ${stage.name}, stage id: ${stage.id}, size: ${bodySize}")

val buildTime = stage.execution.buildTime

val executionUlid = executionId ?: mapLegacyId(ctx, table, stage.execution.id, buildTime).first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,41 @@ class SqlExecutionRepositoryTest : JUnit5Minutests {
}

test("store and retrieve with compression disabled") {
// SqlExecutionRepository.storeExecutionInternal serializes stages
// separately, so do the same here to calculate expected sizes
val beforeStages = pipelineExecution.stages.toList()
pipelineExecution.stages.clear()
val beforePipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution)
pipelineExecution.stages.addAll(beforeStages)
val beforePipelineExecutionSize = beforePipelineString.length.toLong()
val beforeStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single())
val beforeStageSize = beforeStageString.length.toLong()
val beforeTotalSize = beforePipelineExecutionSize + beforeStageSize

sqlExecutionRepositoryNoCompression.store(pipelineExecution)

val afterStages = pipelineExecution.stages.toList()
pipelineExecution.stages.clear()
val expectedPipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution)
pipelineExecution.stages.addAll(afterStages)
val expectedPipelineExecutionSize = expectedPipelineString.length.toLong()
val expectedStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single())
val expectedStageSize = expectedStageString.length.toLong()
val expectedTotalSize = expectedPipelineExecutionSize + expectedStageSize

// Make sure the act of storing the pipeline didn't change the
// serialization (e.g. that the size attributes don't get serialized).
assertThat(beforePipelineString).isEqualTo(expectedPipelineString)
assertThat(beforeStageString).isEqualTo(expectedStageString)
assertThat(beforePipelineExecutionSize).isEqualTo(expectedPipelineExecutionSize);
assertThat(beforeStageSize).isEqualTo(expectedStageSize);
assertThat(beforeTotalSize).isEqualTo(expectedTotalSize);

// And make sure the size is correct
assertThat(pipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize)
assertThat(pipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize)
assertThat(pipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize)

val numCompressedExecutions = database.context.fetchCount(testTable.compressedExecTable)
assertThat(numCompressedExecutions).isEqualTo(0)

Expand All @@ -186,6 +219,11 @@ class SqlExecutionRepositoryTest : JUnit5Minutests {

val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineId)
assertThat(actualPipelineExecution).isEqualTo(pipelineExecution)

// Make sure is calculated on retrieve as well
assertThat(actualPipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize)
assertThat(actualPipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize)
assertThat(actualPipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize)
}

test("store compressed, retrieve with compression disabled") {
Expand Down

0 comments on commit 5cda6b2

Please sign in to comment.