Skip to content

Commit

Permalink
Merge branch 'spinnaker:master' into remove-retrofitError-Front50Service
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranav-b-7 committed Apr 1, 2024
2 parents 29643d8 + 886e2b1 commit 4c6aa53
Show file tree
Hide file tree
Showing 70 changed files with 1,840 additions and 297 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
fiatVersion=1.43.0
korkVersion=7.211.0
fiatVersion=1.44.0
korkVersion=7.221.0
kotlinVersion=1.5.32
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx6g
Expand Down
26 changes: 13 additions & 13 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SqlQueue(
override val publisher: EventPublisher,
private val sqlRetryProperties: SqlRetryProperties,
private val ULID: ULID = ULID(),
private val poolName: String = "default"
private val poolName: String = "default",
private val containsMessageBatchSize: Int = 100,
) : MonitorableQueue {

companion object {
Expand Down Expand Up @@ -187,33 +188,32 @@ class SqlQueue(
}

private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean {
val batchSize = 100
val batchSize = containsMessageBatchSize
var found = false
var lastId = "0"

do {
val rs: ResultSet = withRetry(READ) {
val rs = withRetry(READ) {
jooq.select(idField, fingerprintField, bodyField)
.from(messagesTable)
.where(idField.gt(lastId))
.orderBy(idField.asc())
.limit(batchSize)
.fetch()
.intoResultSet()
}

while (!found && rs.next()) {
val rsIterator = rs.iterator()
while (!found && rsIterator.hasNext()) {
val record = rsIterator.next()
val body = record[bodyField, String::class.java]
try {
found = predicate.invoke(mapper.readValue(rs.getString("body")))
found = predicate.invoke(mapper.readValue(body))
} catch (e: Exception) {
log.error(
"Failed reading message with fingerprint: ${rs.getString("fingerprint")} " +
"message: ${rs.getString("body")}",
e
)
log.error("Failed reading message with fingerprint: ${record[fingerprintField, String::class.java]} message: $body", e)
}
lastId = rs.getString("id")
lastId = record[idField, String::class.java]
}
} while (!found && rs.row == batchSize)
} while (!found && rs.isNotEmpty)

return found
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueueTest
import com.netflix.spinnaker.q.metrics.QueueEvent
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import java.time.Clock
import java.time.Duration
import java.util.Optional
Expand All @@ -37,7 +43,8 @@ private val createQueueNoPublisher = { clock: Clock,

private fun createQueue(clock: Clock,
deadLetterCallback: DeadMessageCallback,
publisher: EventPublisher?): SqlQueue {
publisher: EventPublisher?,
containsMessageBatchSize: Int = 5): SqlQueue {
return SqlQueue(
queueName = "test",
schemaVersion = 1,
Expand Down Expand Up @@ -66,7 +73,8 @@ private fun createQueue(clock: Clock,
sqlRetryProperties = SqlRetryProperties(
transactions = retryPolicy,
reads = retryPolicy
)
),
containsMessageBatchSize = containsMessageBatchSize,
)
}

Expand All @@ -78,3 +86,49 @@ private val retryPolicy: RetryProperties = RetryProperties(
maxRetries = 1,
backoffMs = 10 // minimum allowed
)

class SqlQueueSpecificTests {
private val batchSize = 5
private val clock = MutableClock()
private val deadMessageHandler: DeadMessageCallback = mock()
private val publisher: EventPublisher = mock()
private var queue: SqlQueue? = null

@BeforeEach
fun setup() {
queue = createQueue(clock, deadMessageHandler, publisher, batchSize)
}

@AfterEach
fun cleanup() {
cleanupCallback()
}

@Test
fun `doContainsMessage works with no messages present`() {
assertThat(doContainsMessagePayload("test")).isFalse
}

@Test
fun `doContainsMessage works with a single batch`() {
pushTestMessages(batchSize)
assertThat(doContainsMessagePayload("${batchSize-1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

@Test
fun `doContainsMessage handles multiple batches during search`() {
pushTestMessages(batchSize * 2)
assertThat(doContainsMessagePayload("${batchSize+1}")).isTrue
assertThat(doContainsMessagePayload("")).isFalse
}

private fun pushTestMessages(numberOfMessages: Int) {
for (i in 1 .. numberOfMessages) {
queue?.push(TestMessage(i.toString()))
}
}

private fun doContainsMessagePayload(payload: String): Boolean? =
queue?.containsMessage { message -> message is TestMessage && message.payload == payload }
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.netflix.spinnaker.orca.applications.tasks

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerHttpException
import com.netflix.spinnaker.kork.retrofit.exceptions.SpinnakerServerException
import com.netflix.spinnaker.orca.api.pipeline.TaskResult
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.front50.Front50Service
Expand Down Expand Up @@ -93,6 +95,15 @@ class DeleteApplicationTask extends AbstractFront50Task {
}
log.error("Could not delete application", e)
return TaskResult.builder(ExecutionStatus.TERMINAL).outputs(outputs).build()
} catch (SpinnakerHttpException httpException){
if (httpException.responseCode == 404) {
return TaskResult.SUCCEEDED
}
log.error("Could not delete application", httpException)
return TaskResult.builder(ExecutionStatus.TERMINAL).outputs(outputs).build()
} catch (SpinnakerServerException serverException) {
log.error("Could not delete application", serverException)
return TaskResult.builder(ExecutionStatus.TERMINAL).outputs(outputs).build()
}
return TaskResult.builder(ExecutionStatus.SUCCEEDED).outputs(outputs).build()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2024 Harness, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.clouddriver.tasks.servergroup;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "server-group")
public class ServerGroupProperties {

private Resize resize = new Resize();

public static class Resize {
private boolean matchInstancesSize;

public void setMatchInstancesSize(boolean matchInstancesSize) {
this.matchInstancesSize = matchInstancesSize;
}

public boolean isMatchInstancesSize() {
return matchInstancesSize;
}
}

public void setResize(Resize resize) {
this.resize = resize;
}

public Resize getResize() {
return resize;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -27,6 +27,12 @@
@Component
public class WaitForCapacityMatchTask extends AbstractInstancesCheckTask {

private final ServerGroupProperties serverGroupProperties;

public WaitForCapacityMatchTask(ServerGroupProperties serverGroupProperties) {
this.serverGroupProperties = serverGroupProperties;
}

@Override
protected Map<String, List<String>> getServerGroups(StageExecution stage) {
return (Map<String, List<String>>) stage.getContext().get("deploy.server.groups");
Expand Down Expand Up @@ -88,27 +94,37 @@ protected boolean hasSucceeded(
desired = capacity.getDesired();
}

Integer targetDesiredSize =
Optional.ofNullable((Number) context.get("targetDesiredSize"))
.map(Number::intValue)
.orElse(null);

splainer.add(
String.format(
"checking if capacity matches (desired=%s, target=%s current=%s)",
desired, targetDesiredSize == null ? "none" : targetDesiredSize, instances.size()));
if (targetDesiredSize != null && targetDesiredSize != 0) {
// `targetDesiredSize` is derived from `targetHealthyDeployPercentage` and if present,
// then scaling has succeeded if the number of instances is greater than this value.
if (instances.size() < targetDesiredSize) {
if (serverGroupProperties.getResize().isMatchInstancesSize()) {
splainer.add(
"checking if capacity matches (desired=${desired}, instances.size()=${instances.size()}) ");
if (desired == null || desired != instances.size()) {
splainer.add(
"short-circuiting out of WaitForCapacityMatchTask because targetDesired and current capacity don't match");
"short-circuiting out of WaitForCapacityMatchTask because expected and current capacity don't match}");
return false;
}
} else if (desired == null || desired != instances.size()) {
} else {
Integer targetDesiredSize =
Optional.ofNullable((Number) context.get("targetDesiredSize"))
.map(Number::intValue)
.orElse(null);

splainer.add(
"short-circuiting out of WaitForCapacityMatchTask because expected and current capacity don't match");
return false;
String.format(
"checking if capacity matches (desired=%s, target=%s current=%s)",
desired, targetDesiredSize == null ? "none" : targetDesiredSize, instances.size()));
if (targetDesiredSize != null && targetDesiredSize != 0) {
// `targetDesiredSize` is derived from `targetHealthyDeployPercentage` and if present,
// then scaling has succeeded if the number of instances is greater than this value.
if (instances.size() < targetDesiredSize) {
splainer.add(
"short-circuiting out of WaitForCapacityMatchTask because targetDesired and current capacity don't match");
return false;
}
} else if (desired == null || desired != instances.size()) {
splainer.add(
"short-circuiting out of WaitForCapacityMatchTask because expected and current capacity don't match");
return false;
}
}

boolean disabled = Boolean.TRUE.equals(serverGroup.getDisabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import spock.lang.Unroll
class WaitForCapacityMatchTaskSpec extends Specification {

CloudDriverService cloudDriverService = Mock()
@Subject WaitForCapacityMatchTask task = new WaitForCapacityMatchTask() {
@Subject WaitForCapacityMatchTask task = new WaitForCapacityMatchTask(new ServerGroupProperties()) {
@Override
void verifyServerGroupsExist(StageExecution stage) {
// do nothing
Expand Down Expand Up @@ -264,6 +264,60 @@ class WaitForCapacityMatchTaskSpec extends Specification {
true || 4 | [min: 3, max: 10, desired: 4] | [min: "1", max: "50", desired: "5"]
}
@Unroll
void 'should use number of instances when determining if scaling has succeeded even if targetHealthyDeployPercentage is defined'() {
def serverGroupProperties = new ServerGroupProperties()
def resize = new ServerGroupProperties.Resize()
resize.setMatchInstancesSize(true)
serverGroupProperties.setResize(resize)
WaitForCapacityMatchTask task = new WaitForCapacityMatchTask(serverGroupProperties) {
@Override
void verifyServerGroupsExist(StageExecution stage) {
// do nothing
}
}
when:
def context = [
capacity: [
min: configured.min,
max: configured.max,
desired: configured.desired
],
targetHealthyDeployPercentage: targetHealthyDeployPercentage,
targetDesiredSize: targetHealthyDeployPercentage
? Math.round(targetHealthyDeployPercentage * configured.desired / 100) : null
]
def serverGroup = ModelUtils.serverGroup([
asg: [
desiredCapacity: asg.desired
],
capacity: [
min: asg.min,
max: asg.max,
desired: asg.desired
]
])
def instances = []
(1..healthy).each {
instances << ModelUtils.instance([health: [[state: 'Up']]])
}
then:
result == task.hasSucceeded(
new StageExecutionImpl(PipelineExecutionImpl.newPipeline("orca"), "", "", context),
serverGroup, instances, null
)
where:
result || healthy | asg | configured | targetHealthyDeployPercentage
false || 5 | [min: 10, max: 15, desired: 15] | [min: 10, max: 15, desired: 15] | 85
false || 12 | [min: 10, max: 15, desired: 15] | [min: 10, max: 15, desired: 15] | 85
false || 13 | [min: 10, max: 15, desired: 15] | [min: 10, max: 15, desired: 15] | 85
true || 15 | [min: 10, max: 15, desired: 15] | [min: 10, max: 15, desired: 15] | 100
}
private static Instance makeInstance(id, healthState = 'Up') {
ModelUtils.instance([instanceId: id, health: [ [ state: healthState ] ]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public OkHttpClient.Builder get(@NotNull ServiceEndpoint service) {

ObjectMapper objectMapper = new ObjectMapper();
RestAdapter.LogLevel logLevel = RestAdapter.LogLevel.FULL;
RequestInterceptor requestInterceptor = new SpinnakerRequestInterceptor(null);
RequestInterceptor requestInterceptor = new SpinnakerRequestInterceptor(true);

this.clouddriverRetrofitBuilder =
new CloudDriverConfiguration.ClouddriverRetrofitBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.netflix.spectator.api.NoopRegistry;
import com.netflix.spinnaker.config.DeploymentMonitorDefinition;
import com.netflix.spinnaker.kork.test.log.MemoryAppender;
import com.netflix.spinnaker.okhttp.OkHttpClientConfigurationProperties;
import com.netflix.spinnaker.okhttp.SpinnakerRequestInterceptor;
import com.netflix.spinnaker.orca.api.pipeline.TaskResult;
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus;
Expand Down Expand Up @@ -119,7 +118,7 @@ void setup(TestInfo testInfo) {
new DeploymentMonitorServiceProvider(
okClient,
retrofitLogLevel,
new SpinnakerRequestInterceptor(new OkHttpClientConfigurationProperties()),
new SpinnakerRequestInterceptor(true),
deploymentMonitorDefinitions);
evaluateDeploymentHealthTask =
new EvaluateDeploymentHealthTask(deploymentMonitorServiceProvider, new NoopRegistry());
Expand Down
Loading

0 comments on commit 4c6aa53

Please sign in to comment.