From 8f2ecda19df27eaaa1e070a2eb5300aa58587e56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthieu=20Barth=C3=A9lemy?= Date: Fri, 17 Apr 2020 10:24:41 +0800 Subject: [PATCH] Update for Queues 1.0.0 and custom JobIdentifier --- Package.swift | 4 +-- README.md | 2 +- Sources/QueuesFluentDriver/FluentQueue.swift | 31 +++++++------------ Sources/QueuesFluentDriver/JobModel.swift | 23 +++++++------- .../QueuesFluentDriver/JobModelMigrate.swift | 20 ++++++++++-- .../PopQueries/MySQLPopQuery.swift | 20 +++++++----- .../PopQueries/PopQueryProtocol.swift | 2 +- .../PopQueries/PostgresPopQuery.swift | 10 +++--- .../PopQueries/SqlitePopQuery.swift | 15 ++++----- .../SQLExpressionExtensions.swift | 4 ++- 10 files changed, 72 insertions(+), 59 deletions(-) diff --git a/Package.swift b/Package.swift index 8857d3d..8bd4182 100644 --- a/Package.swift +++ b/Package.swift @@ -14,10 +14,10 @@ let package = Package( targets: ["QueuesFluentDriver"]), ], dependencies: [ - .package(url: "https://github.com/vapor/vapor.git", from: "4.0.1"), + .package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"), .package(url: "https://github.com/vapor/fluent.git", from: "4.0.0-rc.2"), .package(url: "https://github.com/vapor/sql-kit.git", from: "3.0.0-rc"), - .package(url: "https://github.com/vapor/queues.git", from: "1.0.0-rc"), + .package(url: "https://github.com/vapor/queues.git", from: "1.0.0"), ], targets: [ .target( diff --git a/README.md b/README.md index 76ad031..de38179 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ let package = Package( ... dependencies: [ ... - .package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.2.4"), + .package(url: "https://github.com/m-barthelemy/vapor-queues-fluent-driver.git", from: "0.3"), ... ], targets: [ diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index a50a1aa..8ccb570 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -15,11 +15,8 @@ extension FluentQueue: Queue { guard let database = db else { return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) } - guard let uuid = UUID(uuidString: id.string) else { - return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier) - } return database.query(JobModel.self) - .filter(\.$id == uuid) + .filter(\.$jobId == id.string) .first() .unwrap(or: QueuesFluentError.missingJob(id)) .flatMapThrowing { job in @@ -32,24 +29,19 @@ extension FluentQueue: Queue { guard let database = db else { return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) } - guard let uuid = UUID(uuidString: id.string) else { - return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier) - } //let data = try! JSONEncoder().encode(jobStorage) - return JobModel(id: uuid, key: key, data: jobStorage).save(on: database) - //.map { return } + return JobModel(jobId: id.string, queue: queueName.string, data: jobStorage) + .save(on: database) } func clear(_ id: JobIdentifier) -> EventLoopFuture { guard let database = db else { return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) } - guard let uuid = UUID(uuidString: id.string) else { - return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier) - } // This does the equivalent of a Fluent Softdelete but sets the `state` to `completed` return database.query(JobModel.self) - .filter(\.$id == uuid) + .filter(\.$jobId == id.string) + .filter(\.$state != QueuesFluentJobState.completed) .first() .unwrap(or: QueuesFluentError.missingJob(id)) .flatMap { job in @@ -67,17 +59,15 @@ extension FluentQueue: Queue { guard let database = db else { return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) } - guard let uuid = UUID(uuidString: id.string) else { - return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier) - } let sqlDb = database as! SQLDatabase return sqlDb .update(JobModel.schema) .set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.pending)) - .where (SQLColumn("\(FieldKey.id)"), .equal, SQLBind(uuid)) + .where (SQLColumn("\(FieldKey.jobId)"), .equal, SQLBind(id.string)) .run() } + /// Currently selects the oldest job pending execution func pop() -> EventLoopFuture { guard let database = db else { return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) @@ -86,9 +76,10 @@ extension FluentQueue: Queue { var selectQuery = db .select () - .column ("\(FieldKey.id)") + .column ("\(FieldKey.jobId)") .from (JobModel.schema) .where ("\(FieldKey.state)", .equal, SQLBind(QueuesFluentJobState.pending)) + .where ("\(FieldKey.queue)", .equal, SQLBind(self.queueName.string)) .orderBy("\(FieldKey.createdAt)") .limit (1) if self.dbType != .sqlite { @@ -105,7 +96,7 @@ extension FluentQueue: Queue { popProvider = SqlitePop() } return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in - return JobIdentifier(string: id.uuidString) + return JobIdentifier(string: id) } } @@ -120,7 +111,7 @@ extension FluentQueue: Queue { .from (JobModel.schema) .where ("\(FieldKey.state)", .equal, SQLBind(state)) if let queue = queue { - query = query.where("\(FieldKey.key)", .equal, SQLBind(queue)) + query = query.where("\(FieldKey.queue)", .equal, SQLBind(queue)) } if self.dbType != .sqlite { query = query.lockingClause(SQLSkipLocked.forShareSkipLocked) diff --git a/Sources/QueuesFluentDriver/JobModel.swift b/Sources/QueuesFluentDriver/JobModel.swift index 792569d..00bd3e5 100644 --- a/Sources/QueuesFluentDriver/JobModel.swift +++ b/Sources/QueuesFluentDriver/JobModel.swift @@ -3,7 +3,7 @@ import Fluent import Queues public enum QueuesFluentJobState: String, Codable, CaseIterable { - /// Ready to be oicked up for execution + /// Ready to be picked up for execution case pending case processing /// Executed, regardless if it was successful or not @@ -11,7 +11,8 @@ public enum QueuesFluentJobState: String, Codable, CaseIterable { } extension FieldKey { - static var key: Self { "key" } + static var jobId: Self { "job_id" } + static var queue: Self { "queue" } static var data: Self { "data" } static var state: Self { "state" } @@ -23,16 +24,18 @@ extension FieldKey { class JobModel: Model { public required init() {} - /// Properties - public static var schema = "jobs" + public static var schema = "_jobs" /// The unique Job uuid @ID(key: .id) var id: UUID? + @Field(key: .jobId) + var jobId: String? + /// The Job key - @Field(key: .key) - var key: String + @Field(key: .queue) + var queue: String /// The Job data @Field(key: .data) @@ -43,11 +46,9 @@ class JobModel: Model { @Field(key: .state) var state: QueuesFluentJobState - /// The created timestamp @Timestamp(key: .createdAt, on: .create) var createdAt: Date? - /// The updated timestamp @Timestamp(key: .updatedAt, on: .update) var updatedAt: Date? @@ -55,9 +56,9 @@ class JobModel: Model { var deletedAt: Date? - init(id: UUID, key: String, data: JobData) { - self.id = id - self.key = key + init(jobId: String, queue: String, data: JobData) { + self.jobId = jobId + self.queue = queue self.data = try! JSONEncoder().encode(data) self.state = .pending } diff --git a/Sources/QueuesFluentDriver/JobModelMigrate.swift b/Sources/QueuesFluentDriver/JobModelMigrate.swift index cd3d07a..b9cdbb8 100644 --- a/Sources/QueuesFluentDriver/JobModelMigrate.swift +++ b/Sources/QueuesFluentDriver/JobModelMigrate.swift @@ -12,7 +12,8 @@ public struct JobModelMigrate: Migration { public func prepare(on database: Database) -> EventLoopFuture { return database.schema(JobModel.schema) .id() - .field(FieldKey.key, .string, .required) + .field(FieldKey.jobId, .string, .required) + .field(FieldKey.queue, .string, .required) .field(FieldKey.data, .data, .required) .field(FieldKey.state, .string, .required) .field(FieldKey.createdAt, .datetime) @@ -20,12 +21,25 @@ public struct JobModelMigrate: Migration { .field(FieldKey.deletedAt, .datetime) .create() .flatMap { - // Mysql could lock the entire table if there's no index on the field of the WHERE clause + // Mysql could lock the entire table if there's no index on the fields of the WHERE clause. + // Since we have both state and queue in the WHERE clause to retrieve pending jobs, + // both need to be part of a composite index. + // Order of the fields in the composite index and order of the fields in the WHERE clauses should match. + // Or I got totally confused reading their doc, which is also a possibility. + // Postgres seems to not be so sensitive and should be happy with the following indices. let sqlDb = database as! SQLDatabase - return sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)") + let stateIndex = sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.state)_\(FieldKey.queue)") .on(JobModel.schema) .column("\(FieldKey.state)") + .column("\(FieldKey.queue)") .run() + let jobIdIndex = sqlDb.create(index: "i_\(JobModel.schema)_\(FieldKey.jobId)") + .on(JobModel.schema) + .column("\(FieldKey.jobId)") + .run() + return stateIndex.and(jobIdIndex).map { indices in + return + } } } diff --git a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift index 2ccc630..bffade0 100644 --- a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift @@ -4,21 +4,25 @@ import Fluent import Queues final class MySQLPop : PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { + // MySQL is a bit challenging since it doesn't support updating a table that is + // used in a subquery. + // So we first select, then update, with the whole process wrapped in a transaction. + func pop(db: Database, select: SQLExpression) -> EventLoopFuture { db.transaction { transaction in let database = transaction as! SQLDatabase - - var id: UUID? + var id: String? + return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) + id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self) } .flatMap { if let id = id { let updateQuery = database - .update(JobModel.schema) - .set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) - .set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) - .where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id)) + .update (JobModel.schema) + .set (SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) + .set (SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) + .where (SQLColumn.init("\(FieldKey.jobId)"), .equal, SQLBind.init(id)) + .where (SQLColumn.init("\(FieldKey.state)"), .equal, SQLBind.init(QueuesFluentJobState.pending)) .query return database.execute(sql: updateQuery) { (row) in } .map { id } diff --git a/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift b/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift index eef5074..01d4b41 100644 --- a/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift +++ b/Sources/QueuesFluentDriver/PopQueries/PopQueryProtocol.swift @@ -3,5 +3,5 @@ import SQLKit import Fluent protocol PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture + func pop(db: Database, select: SQLExpression) -> EventLoopFuture } diff --git a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift index 8c8b417..ba8113a 100644 --- a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift @@ -4,7 +4,7 @@ import Fluent import Queues final class PostgresPop : PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { + func pop(db: Database, select: SQLExpression) -> EventLoopFuture { let database = db as! SQLDatabase let subQueryGroup = SQLGroupExpression.init(select) let query = database @@ -12,15 +12,15 @@ final class PostgresPop : PopQueryProtocol { .set (SQLColumn("\(FieldKey.state)"), to: SQLBind(QueuesFluentJobState.processing)) .set (SQLColumn("\(FieldKey.updatedAt)"), to: SQLBind(Date())) .where ( - SQLBinaryExpression(left: SQLColumn("\(FieldKey.id)"), op: SQLBinaryOperator.equal , right: subQueryGroup) + SQLBinaryExpression(left: SQLColumn("\(FieldKey.jobId)"), op: SQLBinaryOperator.equal , right: subQueryGroup) ) // Gross abuse - .orWhere(SQLReturning.returning(column: FieldKey.id)) + .orWhere(SQLReturning.returning(column: FieldKey.jobId)) .query - var id: UUID? + var id: String? return database.execute(sql: query) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) + id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self) }.map { return id } diff --git a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift index df6975f..7341d62 100644 --- a/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift +++ b/Sources/QueuesFluentDriver/PopQueries/SqlitePopQuery.swift @@ -4,22 +4,23 @@ import Fluent import Queues final class SqlitePop : PopQueryProtocol { - func pop(db: Database, select: SQLExpression) -> EventLoopFuture { + func pop(db: Database, select: SQLExpression) -> EventLoopFuture { let database = db as! SQLDatabase //let beginImmediateTrxn = database.raw("BEGIN IMMEDIATE"). //return database.raw(SQLQueryString("BEGIN IMMEDIATE")).run().flatMap { void in - var id: UUID? + var id: String? return database.execute(sql: select) { (row) -> Void in - id = try? row.decode(column: "\(FieldKey.id)", as: UUID.self) + id = try? row.decode(column: "\(FieldKey.jobId)", as: String.self) } .flatMap { if let id = id { let updateQuery = database - .update(JobModel.schema) - .set(SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) - .set(SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) - .where(SQLColumn.init("\(FieldKey.id)"), .equal, SQLBind.init(id)) + .update (JobModel.schema) + .set (SQLColumn.init("\(FieldKey.state)"), to: SQLBind.init(QueuesFluentJobState.processing)) + .set (SQLColumn.init("\(FieldKey.updatedAt)"), to: SQLBind.init(Date())) + .where (SQLColumn.init("\(FieldKey.jobId)"), .equal, SQLBind.init(id)) + .where (SQLColumn.init("\(FieldKey.state)"), .equal, SQLBind.init(QueuesFluentJobState.pending)) .query return database.execute(sql: updateQuery) { (row) in } .flatMap { diff --git a/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift b/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift index c3f13bb..af5a926 100644 --- a/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift +++ b/Sources/QueuesFluentDriver/SQLExpressionExtensions.swift @@ -10,6 +10,7 @@ enum SQLSkipLocked: SQLExpression { case .forUpdateSkipLocked: serializer.write("FOR UPDATE SKIP LOCKED") case .forShareSkipLocked: + // This is the "lightest" locking that is supported by both Postgres and Mysql serializer.write("FOR SHARE SKIP LOCKED") } } @@ -26,7 +27,8 @@ enum SQLReturning: SQLExpression { case .returningAll: serializer.write("1=2 RETURNING *") case .returning(let column): - serializer.write("1=2 RETURNING \"\(column.description)\"") + serializer.write("1=2 RETURNING ") + SQLColumn(column.description).serialize(to: &serializer) } } }