From 1db8c2550526733641cf74b05580279627eb14d8 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Fri, 13 Sep 2024 07:30:21 -0400 Subject: [PATCH 1/3] feat(retry backoff): Adding retry with backoff --- Package.swift | 2 +- Sources/JobsRedis/RedisJobQueue.swift | 52 +++++++++++++++++++++-- Tests/JobsRedisTests/RedisJobsTests.swift | 41 +++++++++++++++++- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/Package.swift b/Package.swift index 4c134ca..126f288 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( .library(name: "JobsRedis", targets: ["JobsRedis"]), ], dependencies: [ - .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), + .package(url: "https://github.com/thoven87/swift-jobs.git", branch: "stevenson/retry-backoff"), .package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"), ], targets: [ diff --git a/Sources/JobsRedis/RedisJobQueue.swift b/Sources/JobsRedis/RedisJobQueue.swift index d5575c1..871d6f2 100644 --- a/Sources/JobsRedis/RedisJobQueue.swift +++ b/Sources/JobsRedis/RedisJobQueue.swift @@ -32,6 +32,11 @@ public final class RedisJobQueue: JobQueueDriver { self.delayUntil = Self.toMilliseconds(value: delayUntil?.timeIntervalSince1970) } + init(_ job: JobID, delayUntil: Date?) { + self.id = job.id + self.delayUntil = Self.toMilliseconds(value: delayUntil?.timeIntervalSince1970) + } + /// Initialize JobID from String /// - Parameter value: string value public init(_ value: String) { @@ -66,16 +71,23 @@ public final class RedisJobQueue: JobQueueDriver { public var description: String { "\(self.id):\(self.delayUntil)" } + + /// Computed property used for wildcard matching since a key is in the following format + /// JobID:delayUntil + public var pattern: String { .init("\(self.id):*") } } public enum RedisQueueError: Error, CustomStringConvertible { case unexpectedRedisKeyType case jobMissing(JobID) + case unexpectedMultipleRedisKeysForJob(JobID) public var description: String { switch self { case .unexpectedRedisKeyType: return "Unexpected redis key type" + case .unexpectedMultipleRedisKeysForJob(let value): + return "Unexpected multiple redis keys for job id: \(value.id)" case .jobMissing(let value): return "Job associated with \(value) is missing" } @@ -113,11 +125,24 @@ public final class RedisJobQueue: JobQueueDriver { /// - Returns: Queued job @discardableResult public func push(_ buffer: ByteBuffer, options: JobOptions) async throws -> JobID { let jobInstanceID = JobID(delayUntil: options.delayUntil) + try await self.addToQueue(jobInstanceID, buffer: buffer) + return jobInstanceID + } - try await self.set(jobId: jobInstanceID, buffer: buffer) + private func addToQueue(_ jobId: JobID, buffer: ByteBuffer) async throws { + try await self.set(jobId: jobId, buffer: buffer) + _ = try await self.redisConnectionPool.wrappedValue.lpush(jobId.redisKey, into: self.configuration.queueKey).get() + } - _ = try await self.redisConnectionPool.wrappedValue.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get() - return jobInstanceID + /// Retries failed job currently processing upto maxRetrycount + /// - Parameters: + /// - jobId: JobID + /// - data: Job data + /// - Returns: Queued job + public func retry(jobId: JobID, buffer: NIOCore.ByteBuffer, options: Jobs.JobOptions) async throws { + let newJobId = JobID(jobId, delayUntil: options.delayUntil) + try await self.cleanupRetryingJob(jobId: jobId) + try await self.addToQueue(newJobId, buffer: buffer) } /// Flag job is done @@ -130,6 +155,27 @@ public final class RedisJobQueue: JobQueueDriver { try await self.delete(jobId: jobId) } + private func cleanupRetryingJob(jobId: JobID) async throws { + let (count, prevJobId) = try await self.redisConnectionPool.wrappedValue.scan( + startingFrom: 0, + matching: jobId.pattern + ).get() + // This case should never happend, because there should only be one + // unique key for a given job instance. Guarding against just to be sure + if count > 0 { + throw RedisQueueError.unexpectedMultipleRedisKeysForJob(jobId) + } + + guard let prevJobId = prevJobId.first else { + return + } + + let actualJobId = JobID(prevJobId) + + _ = try await self.redisConnectionPool.wrappedValue.lrem(actualJobId.redisKey, from: self.configuration.processingQueueKey, count: 0).get() + _ = try await self.redisConnectionPool.wrappedValue.delete(actualJobId.redisKey).get() + } + /// Flag job failed to process /// /// Removes job id from processing queue, adds to failed queue diff --git a/Tests/JobsRedisTests/RedisJobsTests.swift b/Tests/JobsRedisTests/RedisJobsTests.swift index 21afcc1..db28e89 100644 --- a/Tests/JobsRedisTests/RedisJobsTests.swift +++ b/Tests/JobsRedisTests/RedisJobsTests.swift @@ -74,7 +74,12 @@ final class RedisJobsTests: XCTestCase { ) ), numWorkers: numWorkers, - logger: logger + logger: logger, + options: .init( + maximumBackoff: 0.01, + maxJitter: 0.01, + minJitter: 0.0 + ) ) return try await withThrowingTaskGroup(of: Void.self) { group in @@ -176,6 +181,40 @@ final class RedisJobsTests: XCTestCase { } } + func testErrorRetryAndThenSucceed() async throws { + let jobIdentifer = JobIdentifier(#function) + let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 2) + let currentJobTryCount: NIOLockedValueBox = .init(0) + struct FailedError: Error {} + try await self.testJobQueue(numWorkers: 1) { jobQueue in + jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in + defer { + currentJobTryCount.withLockedValue { + $0 += 1 + } + } + expectation.fulfill() + if currentJobTryCount.withLockedValue({ $0 }) == 0 { + throw FailedError() + } + } + try await jobQueue.push(id: jobIdentifer, parameters: 0) + + await self.fulfillment(of: [expectation], timeout: 5) + try await Task.sleep(for: .milliseconds(200)) + + let failedJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.failedQueueKey).get() + XCTAssertEqual(failedJobs, 0) + + let pendingJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.queueKey).get() + XCTAssertEqual(pendingJobs, 0) + + let processingJobs = try await jobQueue.queue.redisConnectionPool.wrappedValue.llen(of: jobQueue.queue.configuration.processingQueueKey).get() + XCTAssertEqual(processingJobs, 0) + } + XCTAssertEqual(currentJobTryCount.withLockedValue { $0 }, 2) + } + func testJobSerialization() async throws { struct TestJobParameters: Codable { let id: Int From e0553c99cc14d1252550a0afc39be3aaae5cd1bb Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:13:15 -0400 Subject: [PATCH 2/3] dropped retry func --- Sources/JobsRedis/RedisJobQueue.swift | 44 --------------------------- 1 file changed, 44 deletions(-) diff --git a/Sources/JobsRedis/RedisJobQueue.swift b/Sources/JobsRedis/RedisJobQueue.swift index 871d6f2..f749a1e 100644 --- a/Sources/JobsRedis/RedisJobQueue.swift +++ b/Sources/JobsRedis/RedisJobQueue.swift @@ -32,11 +32,6 @@ public final class RedisJobQueue: JobQueueDriver { self.delayUntil = Self.toMilliseconds(value: delayUntil?.timeIntervalSince1970) } - init(_ job: JobID, delayUntil: Date?) { - self.id = job.id - self.delayUntil = Self.toMilliseconds(value: delayUntil?.timeIntervalSince1970) - } - /// Initialize JobID from String /// - Parameter value: string value public init(_ value: String) { @@ -71,23 +66,16 @@ public final class RedisJobQueue: JobQueueDriver { public var description: String { "\(self.id):\(self.delayUntil)" } - - /// Computed property used for wildcard matching since a key is in the following format - /// JobID:delayUntil - public var pattern: String { .init("\(self.id):*") } } public enum RedisQueueError: Error, CustomStringConvertible { case unexpectedRedisKeyType case jobMissing(JobID) - case unexpectedMultipleRedisKeysForJob(JobID) public var description: String { switch self { case .unexpectedRedisKeyType: return "Unexpected redis key type" - case .unexpectedMultipleRedisKeysForJob(let value): - return "Unexpected multiple redis keys for job id: \(value.id)" case .jobMissing(let value): return "Job associated with \(value) is missing" } @@ -134,17 +122,6 @@ public final class RedisJobQueue: JobQueueDriver { _ = try await self.redisConnectionPool.wrappedValue.lpush(jobId.redisKey, into: self.configuration.queueKey).get() } - /// Retries failed job currently processing upto maxRetrycount - /// - Parameters: - /// - jobId: JobID - /// - data: Job data - /// - Returns: Queued job - public func retry(jobId: JobID, buffer: NIOCore.ByteBuffer, options: Jobs.JobOptions) async throws { - let newJobId = JobID(jobId, delayUntil: options.delayUntil) - try await self.cleanupRetryingJob(jobId: jobId) - try await self.addToQueue(newJobId, buffer: buffer) - } - /// Flag job is done /// /// Removes job id from processing queue @@ -155,27 +132,6 @@ public final class RedisJobQueue: JobQueueDriver { try await self.delete(jobId: jobId) } - private func cleanupRetryingJob(jobId: JobID) async throws { - let (count, prevJobId) = try await self.redisConnectionPool.wrappedValue.scan( - startingFrom: 0, - matching: jobId.pattern - ).get() - // This case should never happend, because there should only be one - // unique key for a given job instance. Guarding against just to be sure - if count > 0 { - throw RedisQueueError.unexpectedMultipleRedisKeysForJob(jobId) - } - - guard let prevJobId = prevJobId.first else { - return - } - - let actualJobId = JobID(prevJobId) - - _ = try await self.redisConnectionPool.wrappedValue.lrem(actualJobId.redisKey, from: self.configuration.processingQueueKey, count: 0).get() - _ = try await self.redisConnectionPool.wrappedValue.delete(actualJobId.redisKey).get() - } - /// Flag job failed to process /// /// Removes job id from processing queue, adds to failed queue From ff6616e27f2ad458cdc459283ee27722f44eb368 Mon Sep 17 00:00:00 2001 From: Stevenson Michel <130018170+thoven87@users.noreply.github.com> Date: Sat, 14 Sep 2024 11:42:00 -0400 Subject: [PATCH 3/3] use main branch --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 126f288..4c134ca 100644 --- a/Package.swift +++ b/Package.swift @@ -10,7 +10,7 @@ let package = Package( .library(name: "JobsRedis", targets: ["JobsRedis"]), ], dependencies: [ - .package(url: "https://github.com/thoven87/swift-jobs.git", branch: "stevenson/retry-backoff"), + .package(url: "https://github.com/hummingbird-project/swift-jobs.git", branch: "main"), .package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"), ], targets: [