Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): log the size of executions when they complete #4660

Merged
merged 2 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -137,6 +138,20 @@ public interface PipelineExecution {

void setPartition(String partition);

/**
* The size of the pipeline execution (not including stages) in units appropriate for the
* implementation (e.g. characters/bytes), if available.
*/
Optional<Long> getSize();

void setSize(long size);

/**
* The total size of the pipeline execution including stages in units appropriate for the
* implementation (e.g. characters/bytes), if available.
*/
Optional<Long> getTotalSize();

// -------

StageExecution namedStage(String type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public interface StageExecution {

void setAdditionalMetricTags(Map<String, String> additionalMetricTags);

/**
* The size of the stage execution in units appropriate for the implementation (e.g.
* characters/bytes), if available.
*/
Optional<Long> getSize();

void setSize(long size);

// ------------- InternalStageExecution?
// A lot of these methods are used in a single place somewhere in Orca. I don't know why we
// decided to put a bunch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PipelineExecutionImpl implements PipelineExecution, Serializable {

public static final DefaultTrigger NO_TRIGGER = new DefaultTrigger("none");
Expand Down Expand Up @@ -381,6 +383,72 @@ public void setPartition(@Nullable String partition) {
return this.partition;
}

@JsonIgnore private Long size = null;

@Override
public Optional<Long> getSize() {
return Optional.ofNullable(this.size);
}

@Override
public void setSize(long size) {
this.size = size;
}

@JsonIgnore
@Override
public Optional<Long> getTotalSize() {
// Calculate the total size of the execution. Since this is called from
// multiple places, don't assume any of the info is present.
//
// If any of the info is missing, return null rather than potentially
// misleading / too small information.
Optional<Long> thisSize = this.getSize();
if (thisSize.isEmpty()) {
log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, no size",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId());
return Optional.empty();
}

// See if any stage is missing info
for (StageExecution stage : this.getStages()) {
if (stage.getSize().isEmpty()) {
log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, stage name: {}, stage id: {}, no size",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId(),
stage.getName(),
stage.getId());
return Optional.empty();
}
}

long totalSize =
thisSize.get()
+ this.getStages().stream()
.mapToLong(
(StageExecution stage) -> {
return stage.getSize().get();
})
.sum();

log.debug(
"getTotalSize: application {}, pipeline name: {}, pipeline config id {}, pipeline execution id {}, total execution size: {}",
this.getApplication(),
this.getName(),
this.getPipelineConfigId(),
this.getId(),
totalSize);

return Optional.of(totalSize);
}

@Nullable
public StageExecution namedStage(String type) {
return stages.stream().filter(it -> it.getType().equals(type)).findFirst().orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ public void setScheduledTime(@Nullable Long scheduledTime) {

private LastModifiedDetails lastModified;

@JsonIgnore private Long size = null;

@Override
public Optional<Long> getSize() {
return Optional.ofNullable(this.size);
}

@Override
public void setSize(long size) {
this.size = size;
}

@Nullable
@Override
public StageExecution.LastModifiedDetails getLastModified() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Salesforce, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.pipeline.model;

import static org.assertj.core.api.Assertions.assertThat;

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PipelineExecutionImplTest {

private PipelineExecutionImpl pipelineExecution;

private StageExecutionImpl stageExecution;

@BeforeEach
void setup() {
pipelineExecution = new PipelineExecutionImpl(ExecutionType.PIPELINE, "test-application");
stageExecution = new StageExecutionImpl();
stageExecution.setExecution(pipelineExecution);
pipelineExecution.getStages().add(stageExecution);
}

@Test
void getTotalSizeMissingPipelineSize() {
// given
assertThat(pipelineExecution.getSize()).isEmpty();

// then
assertThat(pipelineExecution.getTotalSize()).isEmpty();
}

@Test
void getTotalSizeMissingStageSize() {
// given
long pipelineSize = 14; // arbitrary

pipelineExecution.setSize(pipelineSize);
assertThat(pipelineExecution.getSize()).isPresent();

assertThat(stageExecution.getSize()).isEmpty();

// then
assertThat(pipelineExecution.getTotalSize()).isEmpty();
}

@Test
void getTotalSizeCompleteInfo() {
// given
long pipelineSize = 5; // arbitrary
long stageSize = 7; // arbitrary

pipelineExecution.setSize(pipelineSize);
assertThat(pipelineExecution.getSize()).isPresent();

stageExecution.setSize(stageSize);
assertThat(stageExecution.getSize()).isPresent();

// then
assertThat(pipelineExecution.getTotalSize().get()).isEqualTo(pipelineSize + stageSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.netflix.spinnaker.orca.q.CompleteExecution
import com.netflix.spinnaker.orca.q.StartWaitingExecutions
import com.netflix.spinnaker.q.AttemptsAttribute
import com.netflix.spinnaker.q.Queue
import net.logstash.logback.argument.StructuredArguments.kv
import java.time.Duration
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -64,6 +65,15 @@ class CompleteExecutionHandler(
message.determineFinalStatus(execution) { status ->
execution.updateStatus(status)
repository.updateStatus(execution)
val executionContextSize = execution.getTotalSize()
if (executionContextSize.isPresent) {
log.info("completed pipeline execution size: {},{},{},{},{}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something we've learned after living with this for awhile...this isn't necessarily the size of a pipeline execution once spinnaker is totally done with it. As in, things happen to pipeline executions after this. There's logic below to cancel stages of pipelines that didn't succeed, though we've also seen some succeeded pipelines change after this. We're still looking into it.

This log message still seems useful if not as an absolute number, at least as a way to measure change. As in, before/after of the artifact store, or other code changes that influence the size of the execution context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd absolutely go for the log message!

kv("application", execution.application),
kv("pipelineName", execution.name),
kv("pipelineConfigId", execution.pipelineConfigId),
kv("pipelineExecutionId", execution.id),
kv("size", executionContextSize.get()))
}
publisher.publishEvent(ExecutionComplete(this, execution))

registry.counter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ExecutionMapper(
mapper.readValue<PipelineExecution>(body)
.also {
execution ->
execution.setSize(body.length.toLong())
results.add(execution)
execution.partition = rs.getString("partition")

Expand Down Expand Up @@ -120,12 +121,14 @@ class ExecutionMapper(

private fun mapStage(rs: ResultSet, executions: Map<String, PipelineExecution>) {
val executionId = rs.getString("execution_id")
val body = getDecompressedBody(rs)
executions.getValue(executionId)
.stages
.add(
mapper.readValue<StageExecution>(getDecompressedBody(rs))
mapper.readValue<StageExecution>(body)
.apply {
execution = executions.getValue(executionId)
setSize(body.length.toLong())
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ class SqlExecutionRepository(
val stageTableName = execution.type.stagesTableName
val status = execution.status.toString()
val body = mapper.writeValueAsString(execution)
val bodySize = body.length.toLong()
execution.setSize(bodySize)
log.debug("application ${execution.application}, pipeline name: ${execution.name}, pipeline config id ${execution.pipelineConfigId}, pipeline execution id ${execution.id}, execution size: ${bodySize}")

val (executionId, legacyId) = mapLegacyId(ctx, tableName, execution.id, execution.startTime)

Expand Down Expand Up @@ -878,6 +881,10 @@ class SqlExecutionRepository(
val stageTable = stage.execution.type.stagesTableName
val table = stage.execution.type.tableName
val body = mapper.writeValueAsString(stage)
val bodySize = body.length.toLong()
stage.setSize(bodySize)
log.debug("application ${stage.execution.application}, pipeline name: ${stage.execution.name}, pipeline config id ${stage.execution.pipelineConfigId}, pipeline execution id ${stage.execution.id}, stage name: ${stage.name}, stage id: ${stage.id}, size: ${bodySize}")

val buildTime = stage.execution.buildTime

val executionUlid = executionId ?: mapLegacyId(ctx, table, stage.execution.id, buildTime).first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,41 @@ class SqlExecutionRepositoryTest : JUnit5Minutests {
}

test("store and retrieve with compression disabled") {
// SqlExecutionRepository.storeExecutionInternal serializes stages
// separately, so do the same here to calculate expected sizes
val beforeStages = pipelineExecution.stages.toList()
pipelineExecution.stages.clear()
val beforePipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution)
pipelineExecution.stages.addAll(beforeStages)
val beforePipelineExecutionSize = beforePipelineString.length.toLong()
val beforeStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single())
val beforeStageSize = beforeStageString.length.toLong()
val beforeTotalSize = beforePipelineExecutionSize + beforeStageSize

sqlExecutionRepositoryNoCompression.store(pipelineExecution)

val afterStages = pipelineExecution.stages.toList()
pipelineExecution.stages.clear()
val expectedPipelineString = orcaObjectMapper.writeValueAsString(pipelineExecution)
pipelineExecution.stages.addAll(afterStages)
val expectedPipelineExecutionSize = expectedPipelineString.length.toLong()
val expectedStageString = orcaObjectMapper.writeValueAsString(pipelineExecution.stages.single())
val expectedStageSize = expectedStageString.length.toLong()
val expectedTotalSize = expectedPipelineExecutionSize + expectedStageSize

// Make sure the act of storing the pipeline didn't change the
// serialization (e.g. that the size attributes don't get serialized).
assertThat(beforePipelineString).isEqualTo(expectedPipelineString)
assertThat(beforeStageString).isEqualTo(expectedStageString)
assertThat(beforePipelineExecutionSize).isEqualTo(expectedPipelineExecutionSize);
assertThat(beforeStageSize).isEqualTo(expectedStageSize);
assertThat(beforeTotalSize).isEqualTo(expectedTotalSize);

// And make sure the size is correct
assertThat(pipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize)
assertThat(pipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize)
assertThat(pipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize)

val numCompressedExecutions = database.context.fetchCount(testTable.compressedExecTable)
assertThat(numCompressedExecutions).isEqualTo(0)

Expand All @@ -186,6 +219,11 @@ class SqlExecutionRepositoryTest : JUnit5Minutests {

val actualPipelineExecution = sqlExecutionRepositoryNoCompression.retrieve(testType, pipelineId)
assertThat(actualPipelineExecution).isEqualTo(pipelineExecution)

// Make sure is calculated on retrieve as well
assertThat(actualPipelineExecution.size.get()).isEqualTo(expectedPipelineExecutionSize)
assertThat(actualPipelineExecution.stages.single().size.get()).isEqualTo(expectedStageSize)
assertThat(actualPipelineExecution.totalSize.get()).isEqualTo(expectedTotalSize)
}

test("store compressed, retrieve with compression disabled") {
Expand Down