Skip to content

Commit

Permalink
feat(queue): Add queue shovel for migrating backends (#1624)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert authored Sep 19, 2017
1 parent 65e322c commit c57a06a
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.config

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.config.RedisConfiguration
import com.netflix.spinnaker.orca.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.orca.q.redis.RedisQueue
import com.netflix.spinnaker.orca.q.QueueShovel
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import redis.clients.jedis.Jedis
import redis.clients.util.Pool
import java.time.Clock
import java.time.Duration

@Configuration
@ConditionalOnExpression("\${queue.redis.enabled:true}")
class RedisQueueShovelConfiguration {

@Bean(name = arrayOf("previousQueueJedisPool")) open fun previousQueueJedisPool(
@Value("\${redis.connection:redis://localhost:6379}") mainConnection: String,
@Value("\${redis.connectionPrevious:#{null}}") previousConnection: String?,
@Value("\${redis.timeout:2000}") timeout: Int,
redisPoolConfig: GenericObjectPoolConfig,
registry: Registry): Pool<Jedis>? {
if (mainConnection == previousConnection || previousConnection == null) {
return null
}

return RedisConfiguration.createPool(redisPoolConfig, previousConnection, timeout, registry, "previousQueueJedisPool")
}

@Bean(name = arrayOf("previousQueueImpl")) open fun previousRedisQueue(
@Qualifier("previousQueueJedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
clock: Clock,
deadMessageHandler: RedisDeadMessageHandler,
publisher: ApplicationEventPublisher
) =
RedisQueue(
queueName = redisQueueProperties.queueName,
pool = redisPool,
clock = clock,
deadMessageHandler = deadMessageHandler::handle,
publisher = publisher,
ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong())
)


@Bean()
@ConditionalOnBean(name = arrayOf("previousQueueJedisPool")) open fun redisQueueShovel(
@Qualifier("queueImpl") queueImpl: RedisQueue,
@Qualifier("previousQueueImpl") previousQueueImpl: RedisQueue,
registry: Registry
) =
QueueShovel(
queue = queueImpl,
previousQueue = previousQueueImpl,
registry = registry
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.discovery.DiscoveryActivated
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Scheduled
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.PostConstruct


/**
* The QueueShovel can be used to migrate from one queue implementation to another without an
* operator needing to perform any substantial external work.
*
* In the case of a RedisQueue, when a previous Redis connection is configured, this shovel
* would be wired up to move messages off the old Redis server and onto the new one as the
* messages become available for processing.
*/
class QueueShovel(
private val queue: Queue,
private val previousQueue: Queue,
private val registry: Registry
) : DiscoveryActivated {

override val log: Logger = LoggerFactory.getLogger(javaClass)
override val enabled = AtomicBoolean(false)

private val pollOpsRateId = registry.createId("orca.nu.shovel.pollOpsRate")
private val shoveledMessageId = registry.createId("orca.nu.shovel.pushedMessageRate")
private val shovelErrorId = registry.createId("orca.nu.shovel.pushedMessageErrorRate")

@Scheduled(fixedDelayString = "\${queue.shovel.pollFrequency.ms:500}")
fun migrateOne() {
ifEnabled {
registry.counter(pollOpsRateId).increment()
previousQueue.poll { message, ack ->
try {
queue.push(message)
ack.invoke()
registry.counter(shoveledMessageId).increment()
} catch (e: Throwable) {
log.error("Failed shoveling message from previous queue to active (message: {})", message, e)
registry.counter(shovelErrorId).increment()
}
}
}
}

@PostConstruct
fun confirmShovelUsage() =
log.info("Running ${javaClass.simpleName} migrator")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2017 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q

import com.netflix.spectator.api.NoopRegistry
import com.netflix.spinnaker.orca.pipeline.model.Pipeline
import com.nhaarman.mockito_kotlin.*
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek

class QueueShovelTest : SubjectSpek<QueueShovel>({

val queue: Queue = mock()
val previousQueue: Queue = mock()
val registry = NoopRegistry()

subject(CachingMode.GROUP) {
QueueShovel(queue, previousQueue, registry)
}

describe("polling the previous queue") {
val message = StartExecution(Pipeline::class.java, "1", "spinnaker")

beforeGroup {
subject.enabled.set(true)
whenever(previousQueue.poll(any())) doAnswer {
it.getArgument<QueueCallback>(0)(message, {})
}
}

on("the shovel poll method is invoked") {
subject.migrateOne()
}

it("pushes the message onto the current queue") {
verify(queue).push(message)
}
}
})

0 comments on commit c57a06a

Please sign in to comment.