diff --git a/Sources/QueuesFluentDriver/FluentQueue.swift b/Sources/QueuesFluentDriver/FluentQueue.swift index 560b9df..2cd79e7 100644 --- a/Sources/QueuesFluentDriver/FluentQueue.swift +++ b/Sources/QueuesFluentDriver/FluentQueue.swift @@ -8,10 +8,10 @@ struct FluentQueue { let context: QueueContext let dbType: QueuesFluentDbType let useSoftDeletes: Bool = true + static let model = JobModel(id: UUID.generateRandom(), key: "") } extension FluentQueue: Queue { - static let model = JobModel(id: UUID.generateRandom(), key: "") func get(_ id: JobIdentifier) -> EventLoopFuture { guard let database = db else { @@ -103,70 +103,23 @@ extension FluentQueue: Queue { .where("\(Self.model.$state.key)", SQLBinaryOperator.equal, SQLBind.init(JobState.pending)) .orderBy("\(Self.model.$createdAt.path.first!)") .limit (1) - if (self.dbType != .sqlite) { selectQuery = selectQuery.lockingClause(SQLForUpdateSkipLocked.forUpdateSkipLocked) } - let subQueryGroup = SQLGroupExpression.init(selectQuery.query) - let query = db - .update(JobModel.schema) - //.set("\(Self.model.$state.key)", to: JobState.processing) - .set(SQLColumn.init("\(Self.model.$state.key)"), to: SQLBind.init(JobState.processing)) - //.set("\(Self.model.$updatedAt.path.first!)", to: Date()) - .set(SQLColumn.init("\(Self.model.$updatedAt.path.first!)"), to: SQLBind.init(Date())) - .where( - SQLBinaryExpression(left: SQLColumn("\(Self.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup) - ) - // Gross abuse - .orWhere(SQLReturning.returning(column: Self.model.$id.key)) - .query - - var id: UUID? - return db.execute(sql: query) { (row) -> Void in - print("••• columns: \(row.allColumns)") - id = try? row.decode(column: "\(Self.model.$id.key)", as: UUID.self) - print("••• returned id \(id)") - } - .map { - if (id != nil) { - return JobIdentifier(string: id!.uuidString) - } - return nil + var popProvider: PopQueryProtocol! + switch (self.dbType) { + case .postgres: + popProvider = PostgresPop() + case .mysql: + popProvider = MySQLPop() + default: + return self.context.eventLoop.makeFailedFuture(QueuesFluentError.databaseNotFound) + } + return popProvider.pop(db: database, select: selectQuery.query).optionalMap { id in + return JobIdentifier(string: id.uuidString) } - - // UPDATE `jobs` - // SET `state` = ?, `updated_at` = ? - // WHERE `id` = (SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED) - // OR 1=2 - // RETURNING "id" - - // -- should be -- - - // BEGIN TRANSACTION - // SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED; - // UPDATE `jobs` - // SET - // `state` = ?, - // `updated_at` = ? - // WHERE `id` = xxxxxxx; - // COMMIT - /*let driver = dbDriver() - return driver.rawQuery(db: database, query: query).map { id in - if(id != nil ) { - return JobIdentifier(string: id!.uuidString) - } - else { - return nil - } - }*/ - - /*let (sql, binds) = db.serialize(query) - return db.query(db: db, sql: sql, binds: binds).first().optionalMap { row in - return JobIdentifier(string: (try! row.decode(column: "\(Self.model.$id.key)", as: UUID.self)).uuidString) - } - */ } } diff --git a/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift new file mode 100644 index 0000000..9ddbf20 --- /dev/null +++ b/Sources/QueuesFluentDriver/PopQueries/MySQLPopQuery.swift @@ -0,0 +1,34 @@ +import Foundation +import SQLKit +import Fluent +import Queues + +struct MySQLPop : PopQueryProtocol { + func pop(db: Database, select: SQLExpression) -> EventLoopFuture { + db.transaction { transaction in + let database = transaction as! SQLDatabase + + var id: UUID? + return database.execute(sql: select) { (row) -> Void in + print("••• columns: \(row.allColumns)") + id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self) + print("••• returned id \(id)") + } + .flatMap { + if (id != nil) { + let updateQuery = database + .update(JobModel.schema) + .set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(JobState.processing)) + .set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date())) + .where(SQLColumn.init("\(FluentQueue.model.$id.key)"), .equal, SQLBind.init(id!)) + .query + return database.execute(sql: updateQuery) { (row) in } + .map { id } + } + return database.eventLoop.makeSucceededFuture(nil) + } + + + } + } +} diff --git a/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift new file mode 100644 index 0000000..1e8cc78 --- /dev/null +++ b/Sources/QueuesFluentDriver/PopQueries/PostgresPopQuery.swift @@ -0,0 +1,29 @@ +import Foundation +import SQLKit +import Fluent +import Queues + +struct PostgresPop : PopQueryProtocol { + func pop(db: Database, select: SQLExpression) -> EventLoopFuture { + let database = db as! SQLDatabase + let subQueryGroup = SQLGroupExpression.init(select) + let query = database + .update(JobModel.schema) + .set(SQLColumn.init("\(FluentQueue.model.$state.key)"), to: SQLBind.init(JobState.processing)) + .set(SQLColumn.init("\(FluentQueue.model.$updatedAt.path.first!)"), to: SQLBind.init(Date())) + .where( + SQLBinaryExpression(left: SQLColumn("\(FluentQueue.model.$id.key)"), op: SQLBinaryOperator.equal , right: subQueryGroup) + ) + // Gross abuse + .orWhere(SQLReturning.returning(column: FluentQueue.model.$id.key)) + .query + + var id: UUID? + return database.execute(sql: query) { (row) -> Void in + id = try? row.decode(column: "\(FluentQueue.model.$id.key)", as: UUID.self) + } + .map { + return id + } + } +}