Skip to content

Commit

Permalink
write transactions that can span tables
Browse files Browse the repository at this point in the history
  • Loading branch information
ckhsponge committed Feb 12, 2024
1 parent 4520343 commit a5b7640
Show file tree
Hide file tree
Showing 25 changed files with 1,765 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
dynamoid (3.9.0)
dynamoid (3.10.0)
activemodel (>= 4)
aws-sdk-dynamodb (~> 1.0)
concurrent-ruby (>= 1.0)
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,11 @@ resolving the fields with a second query against the table since a query
against GSI then a query on base table is still likely faster than scan
on the base table*

### Transaction Writes

Multiple write actions can be grouped together and submitted as an all-or-nothing operation.
See the [transation documentation](README_transact.md).

### PartiQL

To run PartiQL statements `Dynamoid.adapter.execute` method should be
Expand Down Expand Up @@ -1350,6 +1355,7 @@ just as accessible to the Ruby world as MongoDB.
Also, without contributors the project wouldn't be nearly as awesome. So
many thanks to:

* [Chris Hobbs](https://github.com/ckhsponge)
* [Logan Bowers](https://github.com/loganb)
* [Lane LaRue](https://github.com/luxx)
* [Craig Heneveld](https://github.com/cheneveld)
Expand Down
118 changes: 118 additions & 0 deletions README_transact.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Transactions in Dynamoid

Synchronous write operations are supported in Dynamoid using transactions.
If any action in the transaction fails they all fail.
The following actions are supported:

* Create - add a new item if it does not already exist
* Upsert - add a new item or update an existing item, no callbacks
* Update - modifies one or more attributes from an existig item
* Delete - remove an item without callbacks, validations nor existence check
* Destroy - remove an item, fails if item does not exist

## Examples



### Save models
Models can be saved in a transaction.
New records are created otherwise the model is updated.
Save, create, update, validate and destroy callbacks are called around the transaction as appropriate.
Validation failures will throw Dynamoid::Errors::DocumentNotValid.

```ruby
user = User.find(1)
article = Article.new(body: 'New article text', user_id: user.id)
Dynamoid::TransactionWrite.execute do |txn|
txn.save!(article)
user.last_article_id = article.id
txn.save!(user)
end
```

### Create items
Items can be created inside of a transaction.
The hash key and range key, if applicable, are used to determine uniqueness.
Creating will fail with Aws::DynamoDB::Errors::TransactionCanceledException if an item already exists unless skip_existence_check is true.
This example creates a user with a unique id and unique email address by creating 2 items.
An additional item is upserted in the same transaction.
Upserts will update updated_at but will not create created_at.

```ruby
user_id = SecureRandom.uuid
email = '[email protected]'
Dynamoid::TransactionWrite.execute do |txn|
txn.create!(User, id: user_id)
txn.create!(UserEmail, id: "UserEmail##{email}", user_id: user_id)
txn.create!(Address, { id: 'A#2', street: '456' }, { skip_existence_check: true })
txn.upsert!(Address, id: 'A#1', street: '123')
end
```

### Destroy or delete items
Models can be used or the model class and key can be specified.
When the key is a single column it is specified as a single value or a hash
with the name of the hash key.
When using a composite key the key must be a hash with the hash key and range key.
destroy() uses callbacks and validations and fails if the item does not exist.
Use delete() to skip callbacks, validations and the existence check.

```ruby
article = Article.find(1)
tag = article.tag
Dynamoid::TransactionWrite.execute do |txn|
txn.destroy!(article)
txn.destroy!(Article, 2) # performs find() automatically and then runs destroy callbacks
txn.destroy!(tag)
txn.delete(Tag, 2) # delete record with hash key '2' if it exists
txn.delete(Tag, id: 2) # equivalent of the above if the hash key column is 'id'
txn.delete(Tag, id: 'key#abcd', my_sort_key: 'range#1') # when range key is required
end
```

### Skipping callbacks and validations
Validations and callbacks can be skipped per action.
Validation failures will throw Dynamoid::Errors::DocumentNotValid when using the bang! methods.
Note that validation callbacks are run when validation happens even if skipping callbacks here.
Skipping callbacks and validation guarantees no callbacks.

```ruby
user = User.find(1)
user.red = true
Dynamoid::TransactionWrite.execute do |txn|
txn.save!(user, skip_callbacks: true)
txn.create!(User, { name: 'bob' }, { skip_callbacks: true })
end
Dynamoid::TransactionWrite.execute do |txn|
txn.save!(user, skip_validation: true)
txn.create!(User, { name: 'bob' }, { skip_validation: true })
end
```

### Validation failures that don't raise
All of the transaction methods can be called without the bang! which results in
false instead of a raised exception when validation fails.
Ignoring validation failures can lead to confusion or bugs so always check return status when not using a bang!

```ruby
user = User.find(1)
user.red = true
Dynamoid::TransactionWrite.execute do |txn|
if txn.save(user) # won't raise validation exception
txn.update(UserCount, id: 'UserCount#Red', count: 5)
else
puts 'ALERT: user not valid, skipping'
end
end
```

### Incrementally building a transaction
Transactions can also be built without a block.

```ruby
transaction = Dynamoid::TransactionWrite.new
transaction.create!(User, id: user_id)
transaction.create!(UserEmail, id: "UserEmail##{email}", user_id: user_id)
transaction.upsert!(Address, id: 'A#1', street: '123')
transaction.commit
```
1 change: 1 addition & 0 deletions lib/dynamoid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
require 'dynamoid/components'
require 'dynamoid/document'
require 'dynamoid/adapter'
require 'dynamoid/transaction_write'

require 'dynamoid/tasks/database'

Expand Down
5 changes: 5 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require_relative 'aws_sdk_v3/item_updater'
require_relative 'aws_sdk_v3/table'
require_relative 'aws_sdk_v3/until_past_table_status'
require_relative 'aws_sdk_v3/transact'

module Dynamoid
# @private
Expand Down Expand Up @@ -289,6 +290,10 @@ def batch_delete_item(options)
raise Dynamoid::Errors::ConditionalCheckFailedException, e
end

def transact_write_items(items)
Transact.new(client).transact_write_items(items)
end

# Create a table on DynamoDB. This usually takes a long time to complete.
#
# @param [String] table_name the name of the table to create
Expand Down
31 changes: 31 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3/transact.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# Prepare all the actions of the transaction for sending to the AWS SDK.
module Dynamoid
module AdapterPlugin
class AwsSdkV3
class Transact
attr_reader :client

def initialize(client)
@client = client
end

# Perform all of the item actions in a single transaction.
#
# @param [Array] items of type Dynamoid::Transaction::Action or
# any other object whose to_h is a transact_item hash
#
def transact_write_items(items)
transact_items = items.map(&:to_h)
params = {
transact_items: transact_items,
return_consumed_capacity: 'TOTAL',
return_item_collection_metrics: 'SIZE'
}
client.transact_write_items(params) # returns this
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/dynamoid/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Errors
# Generic Dynamoid error
class Error < StandardError; end

class MissingHashKey < Error; end
class MissingRangeKey < Error; end

class MissingIndex < Error; end
Expand Down
3 changes: 2 additions & 1 deletion lib/dynamoid/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ module Dynamoid
module Persistence
extend ActiveSupport::Concern

attr_accessor :new_record
attr_accessor :new_record, :destroyed
alias new_record? new_record
alias destroyed? destroyed

# @private
UNIX_EPOCH_DATE = Date.new(1970, 1, 1).freeze
Expand Down
101 changes: 101 additions & 0 deletions lib/dynamoid/transaction_write.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

require 'dynamoid/transaction_write/action'
require 'dynamoid/transaction_write/create'
require 'dynamoid/transaction_write/delete'
require 'dynamoid/transaction_write/destroy'
require 'dynamoid/transaction_write/update_upsert'
require 'dynamoid/transaction_write/update'
require 'dynamoid/transaction_write/upsert'

module Dynamoid
class TransactionWrite
attr_accessor :action_inputs, :models

def initialize(_options = {})
@action_inputs = []
@models = []
end

def self.execute(options = {})
transaction = new(options)
yield(transaction)
transaction.commit
end

def commit
return unless @action_inputs.present? # nothing to commit

Dynamoid.adapter.transact_write_items(@action_inputs)
models.each { |model| model.new_record = false }
end

def save!(model, options = {})
save(model, options.reverse_merge(raise_validation_error: true))
end

def save(model, options = {})
model.new_record? ? create(model, {}, options) : update(model, {}, options)
end

def create!(model_or_model_class, attributes = {}, options = {})
create(model_or_model_class, attributes, options.reverse_merge(raise_validation_error: true))
end

def create(model_or_model_class, attributes = {}, options = {})
add_action_and_validate Dynamoid::TransactionWrite::Create.new(model_or_model_class, attributes, options)
end

# upsert! does not exist because upserting instances that can raise validation errors is not officially supported

def upsert(model_or_model_class, attributes = {}, options = {})
add_action_and_validate Dynamoid::TransactionWrite::Upsert.new(model_or_model_class, attributes, options)
end

def update!(model_or_model_class, attributes = {}, options = {})
update(model_or_model_class, attributes, options.reverse_merge(raise_validation_error: true))
end

def update(model_or_model_class, attributes = {}, options = {})
add_action_and_validate Dynamoid::TransactionWrite::Update.new(model_or_model_class, attributes, options)
end

def delete(model_or_model_class, key_or_attributes = {}, options = {})
add_action_and_validate Dynamoid::TransactionWrite::Delete.new(model_or_model_class, key_or_attributes, options)
end

def destroy!(model_or_model_class, key_or_attributes = {}, options = {})
destroy(model_or_model_class, key_or_attributes, options.reverse_merge(raise_validation_error: true))
end

def destroy(model_or_model_class, key_or_attributes = {}, options = {})
add_action_and_validate Dynamoid::TransactionWrite::Destroy.new(model_or_model_class, key_or_attributes, options)
end

private

# validates unless validations are skipped
# runs callbacks unless callbacks are skipped
# raise validation error or returns false if not valid
# otherwise adds hash of action to list in preparation for committing
def add_action_and_validate(action)
if !action.skip_validation? && !action.valid?
raise Dynamoid::Errors::DocumentNotValid, action.model if action.raise_validation_error?

return false
end

if action.skip_callbacks?
@action_inputs << action.to_h
else
action.run_callbacks do
@action_inputs << action.to_h
end
end
action.changes_applied # action has been processed and added to queue so mark as applied
models << action.model if action.model

action.model || true # return model if it exists
end
end
end
Loading

0 comments on commit a5b7640

Please sign in to comment.