Skip to content

Commit

Permalink
WIP on Clients
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyshields committed May 17, 2024
1 parent 9eaac0a commit b2ce94d
Show file tree
Hide file tree
Showing 27 changed files with 1,905 additions and 93 deletions.
676 changes: 676 additions & 0 deletions lib/active_document/adapters/dynamodb_aws_sdk_v3/aws_sdk_v3.rb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# frozen_string_literal: true

module Dynamoid
# @private
module AdapterPlugin
class AwsSdkV3
# Documentation
# https://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_get_item-instance_method
class BatchGetItem
attr_reader :client, :tables_with_ids, :options

def initialize(client, tables_with_ids, options = {})
@client = client
@tables_with_ids = tables_with_ids
@options = options
end

def call
results = {}

tables_with_ids.each do |table, ids|
if ids.blank?
results[table.name] = []
next
end

ids = Array(ids).dup

while ids.present?
batch = ids.shift(Dynamoid::Config.batch_size)
request = build_request(table, batch)
api_response = client.batch_get_item(request)
response = Response.new(api_response)

if block_given?
# return batch items as a result
batch_results = Hash.new([].freeze)
batch_results.update(response.items_grouped_by_table)

yield(batch_results, response.successful_partially?)
else
# collect all the batches to return at the end
results.update(response.items_grouped_by_table) { |_, its1, its2| its1 + its2 }
end

if response.successful_partially?
ids += response.unprocessed_ids(table)
end
end
end

results unless block_given?
end

private

def build_request(table, ids)
ids = Array(ids)

keys = if table.range_key.nil?
ids.map { |hk| { table.hash_key => hk } }
else
ids.map { |hk, rk| { table.hash_key => hk, table.range_key => rk } }
end

{
request_items: {
table.name => {
keys: keys,
consistent_read: options[:consistent_read]
}
}
}
end

# Helper class to work with response
class Response
def initialize(api_response)
@api_response = api_response
end

def successful_partially?
@api_response.unprocessed_keys.present?
end

def unprocessed_ids(table)
# unprocessed_keys Hash contains as values instances of
# Aws::DynamoDB::Types::KeysAndAttributes
@api_response.unprocessed_keys[table.name].keys.map do |h|
# If a table has a composite primary key then we need to return an array
# of [hash key, range key]. Otherwise just return hash key's
# value.
if table.range_key.nil?
h[table.hash_key.to_s]
else
[h[table.hash_key.to_s], h[table.range_key.to_s]]
end
end
end

def items_grouped_by_table
# data[:responses] is a Hash[table_name -> items]
@api_response.data[:responses].transform_values do |items|
items.map(&method(:item_to_hash))
end
end

private

def item_to_hash(item)
item.symbolize_keys
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
# frozen_string_literal: true

require_relative 'until_past_table_status'

module Dynamoid
# @private
module AdapterPlugin
class AwsSdkV3
class CreateTable
attr_reader :client, :table_name, :key, :options

def initialize(client, table_name, key, options)
@client = client
@table_name = table_name
@key = key
@options = options
end

def call
billing_mode = options[:billing_mode]
read_capacity = options[:read_capacity] || Dynamoid::Config.read_capacity
write_capacity = options[:write_capacity] || Dynamoid::Config.write_capacity

secondary_indexes = options.slice(
:local_secondary_indexes,
:global_secondary_indexes
)
ls_indexes = options[:local_secondary_indexes]
gs_indexes = options[:global_secondary_indexes]

key_schema = {
hash_key_schema: { key => options[:hash_key_type] || :string },
range_key_schema: options[:range_key]
}
attribute_definitions = build_all_attribute_definitions(
key_schema,
secondary_indexes
)
key_schema = aws_key_schema(
key_schema[:hash_key_schema],
key_schema[:range_key_schema]
)

client_opts = {
table_name: table_name,
key_schema: key_schema,
attribute_definitions: attribute_definitions
}

if billing_mode == :on_demand
client_opts[:billing_mode] = 'PAY_PER_REQUEST'
else
client_opts[:billing_mode] = 'PROVISIONED'
client_opts[:provisioned_throughput] = {
read_capacity_units: read_capacity,
write_capacity_units: write_capacity
}
end

if ls_indexes.present?
client_opts[:local_secondary_indexes] = ls_indexes.map do |index|
index_to_aws_hash(index)
end
end

if gs_indexes.present?
client_opts[:global_secondary_indexes] = gs_indexes.map do |index|
index_to_aws_hash(index)
end
end
resp = client.create_table(client_opts)
options[:sync] = true if (!options.key?(:sync) && ls_indexes.present?) || gs_indexes.present?

if options[:sync]
status = PARSE_TABLE_STATUS.call(resp, :table_description)
if status == TABLE_STATUSES[:creating]
UntilPastTableStatus.new(client, table_name, :creating).call
end
end

# Response to original create_table, which, if options[:sync]
# may have an outdated table_description.table_status of "CREATING"
resp
end

private

# Builds aws attributes definitions based off of primary hash/range and
# secondary indexes
#
# @param key_schema
# @option key_schema [Hash] hash_key_schema - eg: {:id => :string}
# @option key_schema [Hash] range_key_schema - eg: {:created_at => :number}
# @param [Hash] secondary_indexes
# @option secondary_indexes [Array<Dynamoid::Indexes::Index>] :local_secondary_indexes
# @option secondary_indexes [Array<Dynamoid::Indexes::Index>] :global_secondary_indexes
def build_all_attribute_definitions(key_schema, secondary_indexes = {})
ls_indexes = secondary_indexes[:local_secondary_indexes]
gs_indexes = secondary_indexes[:global_secondary_indexes]

attribute_definitions = []

attribute_definitions << build_attribute_definitions(
key_schema[:hash_key_schema],
key_schema[:range_key_schema]
)

if ls_indexes.present?
ls_indexes.map do |index|
attribute_definitions << build_attribute_definitions(
index.hash_key_schema,
index.range_key_schema
)
end
end

if gs_indexes.present?
gs_indexes.map do |index|
attribute_definitions << build_attribute_definitions(
index.hash_key_schema,
index.range_key_schema
)
end
end

attribute_definitions.flatten!
# uniq these definitions because range keys might be common between
# primary and secondary indexes
attribute_definitions.uniq!
attribute_definitions
end

# Builds an attribute definitions based on hash key and range key
# @param [Hash] hash_key_schema - eg: {:id => :string}
# @param [Hash] range_key_schema - eg: {:created_at => :datetime}
# @return [Array]
def build_attribute_definitions(hash_key_schema, range_key_schema = nil)
attrs = []

attrs << attribute_definition_element(
hash_key_schema.keys.first,
hash_key_schema.values.first
)

if range_key_schema.present?
attrs << attribute_definition_element(
range_key_schema.keys.first,
range_key_schema.values.first
)
end

attrs
end

# Builds an aws attribute definition based on name and dynamoid type
# @param [Symbol] name - eg: :id
# @param [Symbol] dynamoid_type - eg: :string
# @return [Hash]
def attribute_definition_element(name, dynamoid_type)
aws_type = api_type(dynamoid_type)

{
attribute_name: name.to_s,
attribute_type: aws_type
}
end

# Converts from symbol to the API string for the given data type
# E.g. :number -> 'N'
def api_type(type)
case type
when :string then STRING_TYPE
when :number then NUM_TYPE
when :binary then BINARY_TYPE
else raise "Unknown type: #{type}"
end
end

# Converts a Dynamoid::Indexes::Index to an AWS API-compatible hash.
# This resulting hash is of the form:
#
# {
# index_name: String
# keys: {
# hash_key: aws_key_schema (hash)
# range_key: aws_key_schema (hash)
# }
# projection: {
# projection_type: (ALL, KEYS_ONLY, INCLUDE) String
# non_key_attributes: (optional) Array
# }
# provisioned_throughput: {
# read_capacity_units: Integer
# write_capacity_units: Integer
# }
# }
#
# @param [Dynamoid::Indexes::Index] index the index.
# @return [Hash] hash representing an AWS Index definition.
def index_to_aws_hash(index)
key_schema = aws_key_schema(index.hash_key_schema, index.range_key_schema)

hash = {
index_name: index.name,
key_schema: key_schema,
projection: {
projection_type: index.projection_type.to_s.upcase
}
}

# If the projection type is include, specify the non key attributes
if index.projection_type == :include
hash[:projection][:non_key_attributes] = index.projected_attributes
end

# Only global secondary indexes have a separate throughput.
if index.type == :global_secondary && options[:billing_mode] != :on_demand
hash[:provisioned_throughput] = {
read_capacity_units: index.read_capacity,
write_capacity_units: index.write_capacity
}
end
hash
end

# Converts hash_key_schema and range_key_schema to aws_key_schema
# @param [Hash] hash_key_schema eg: {:id => :string}
# @param [Hash] range_key_schema eg: {:created_at => :number}
# @return [Array]
def aws_key_schema(hash_key_schema, range_key_schema)
schema = [{
attribute_name: hash_key_schema.keys.first.to_s,
key_type: HASH_KEY
}]

if range_key_schema.present?
schema << {
attribute_name: range_key_schema.keys.first.to_s,
key_type: RANGE_KEY
}
end
schema
end
end
end
end
end
Loading

0 comments on commit b2ce94d

Please sign in to comment.