Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruby Client v4.2.0 #138

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file.

## [4.2.0] 2024-12-18

- **Fixes**
- [CLIENT-3195] Fix ruby client does not return failed nodes on timeout.

## [4.1.0] 2024-10-22

- **New Features**
Expand Down
44 changes: 22 additions & 22 deletions lib/aerospike/aerospike_exception.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,79 +21,79 @@
module Aerospike
module Exceptions
class Aerospike < StandardError
attr_reader :result_code
attr_reader :result_code, :failed_nodes

def initialize(result_code, message = nil)
def initialize(result_code, message = nil, failed_nodes = nil)
@result_code = result_code
@failed_nodes = failed_nodes
message ||= ResultCode.message(result_code)
super(message)
end
end

class Timeout < Aerospike
attr_reader :timeout, :iterations, :failed_nodes, :failed_connections
attr_reader :timeout, :iterations, :failed_connections

def initialize(timeout, iterations, failed_nodes=nil, failed_connections=nil)
@timeout = timeout
@iterations = iterations
@failed_nodes = failed_nodes
@failed_connections = failed_connections

super(ResultCode::TIMEOUT)
super(ResultCode::TIMEOUT, nil, failed_nodes)
end
end

class InvalidCredentials < Aerospike
def initialize(msg = nil)
super(ResultCode::NOT_AUTHENTICATED, msg)
def initialize(msg = nil, node=nil)
super(ResultCode::NOT_AUTHENTICATED, msg, [node])
end
end

class Serialize < Aerospike
def initialize(msg=nil)
super(ResultCode::SERIALIZE_ERROR, msg)
super(ResultCode::SERIALIZE_ERROR, msg, [node])
end
end

class Parse < Aerospike
def initialize(msg=nil)
super(ResultCode::PARSE_ERROR, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::PARSE_ERROR, msg, [node])
end
end

class Connection < Aerospike
def initialize(msg=nil)
super(ResultCode::SERVER_NOT_AVAILABLE, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::SERVER_NOT_AVAILABLE, msg, [node])
end
end

class InvalidNode < Aerospike
def initialize(msg=nil)
super(ResultCode::INVALID_NODE_ERROR, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::INVALID_NODE_ERROR, msg, [node])
end
end

class ScanTerminated < Aerospike
def initialize(msg=nil)
super(ResultCode::SCAN_TERMINATED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::SCAN_TERMINATED, msg, [node])
end
end

class QueryTerminated < Aerospike
def initialize(msg=nil)
super(ResultCode::QUERY_TERMINATED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::QUERY_TERMINATED, msg, [node])
end
end

class CommandRejected < Aerospike
def initialize(msg=nil)
super(ResultCode::COMMAND_REJECTED, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::COMMAND_REJECTED, msg, [node])
end
end

class InvalidNamespace < Aerospike
def initialize(msg=nil)
super(ResultCode::INVALID_NAMESPACE, msg)
def initialize(msg=nil, node=nil)
super(ResultCode::INVALID_NAMESPACE, msg, [node])
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {})

response = send_info_command(policy, str_cmd, node).upcase
return if response == "OK"
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}")
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}", [node])
end

#-------------------------------------------------------
Expand Down
30 changes: 15 additions & 15 deletions lib/aerospike/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def batch_read_node(partition, replica_policy)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

Expand All @@ -147,21 +147,21 @@ def read_node(partition, replica_policy, seq)
when Aerospike::Replica::RANDOM
random_node
else
raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value")
raise Aerospike::Exceptions::InvalidNode.new("invalid policy.replica value")
end
end

# Returns a node on the cluster for read operations
def master_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array

node = node_array.get[partition.partition_id]
raise Aerospike::Exceptions::InvalidNode if !node || !node.active?
raise Aerospike::Exceptions::InvalidNode.new("no active node found") if !node || !node.active?

node
end
Expand All @@ -170,7 +170,7 @@ def master_node(partition)
def rack_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -195,14 +195,14 @@ def rack_node(partition, seq)

return fallback if fallback

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a node on the cluster for read operations
def master_proles_node(partition)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -214,14 +214,14 @@ def master_proles_node(partition)
return node if node && node.active?
end

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a random node on the cluster
def sequence_node(partition, seq)
partition_map = partitions
replica_array = partition_map[partition.namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

replica_array = replica_array.get

Expand All @@ -233,7 +233,7 @@ def sequence_node(partition, seq)
return node if node && node.active?
end

raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("node active node found")
end

def get_node_for_key(replica_policy, key, is_write: false)
Expand All @@ -251,10 +251,10 @@ def node_partitions(node, namespace)

partition_map = partitions
replica_array = partition_map[namespace]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless replica_array

node_array = replica_array.get[0]
raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array
raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") unless node_array


pid = 0
Expand All @@ -281,7 +281,7 @@ def random_node

i = i.succ
end
raise Aerospike::Exceptions::InvalidNode
raise Aerospike::Exceptions::InvalidNode.new("no active node found")
end

# Returns a list of all nodes in the cluster
Expand All @@ -296,7 +296,7 @@ def nodes
def get_node_by_name(node_name)
node = find_node_by_name(node_name)

raise Aerospike::Exceptions::InvalidNode unless node
raise Aerospike::Exceptions::InvalidNode.new("node `#{node_name}` not found") unless node

node
end
Expand Down
8 changes: 5 additions & 3 deletions lib/aerospike/cluster/partition_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def update_partitions(current_map)

info = info_map[REPLICAS_ALL]
if !info || info.length == 0
raise Aerospike::Exceptions::Connection.new("#{REPLICAS_ALL} response for node #{@node.name} is empty")
raise Aerospike::Exceptions::Connection.new("#{REPLICAS_ALL} response for node #{@node.name} is empty", @node)
end

@buffer = info
Expand Down Expand Up @@ -112,7 +112,8 @@ def parse_name
if namespace.length <= 0 || namespace.length >= 32
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid partition namespace #{namespace}. Response=#{response}"
"Invalid partition namespace #{namespace}. Response=#{response}",
@node
)
end

Expand All @@ -133,7 +134,8 @@ def parse_replica_count
if count < 0 || count > 4096
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid partition count #{count}. Response=#{response}"
"Invalid partition count #{count}. Response=#{response}",
@node
)
end

Expand Down
10 changes: 6 additions & 4 deletions lib/aerospike/cluster/rack_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def update_racks

info = info_map[RACK_IDS]
if !info || info.length == 0
raise Aerospike::Exceptions::Connection.new("#{RACK_IDS} response for node #{@node.name} is empty")
raise Aerospike::Exceptions::Connection.new("#{RACK_IDS} response for node #{@node.name} is empty", @node)
end

@buffer = info
Expand All @@ -54,7 +54,7 @@ def update_racks
namespace = parse_name
rack_id = parse_rack_id

@racks = {} if !@racks
@racks ||= {}
@racks[namespace] = rack_id
end

Expand All @@ -76,7 +76,8 @@ def parse_name
if namespace.length <= 0 || namespace.length >= 32
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid rack namespace #{namespace}. Response=#{response}"
"Invalid rack namespace #{namespace}. Response=#{response}",
@node
)
end

Expand All @@ -97,7 +98,8 @@ def parse_rack_id
if rack_id < 0
response = get_truncated_response
raise Aerospike::Exceptions::Parse.new(
"Invalid rack_id #{rack_id}. Response=#{response}"
"Invalid rack_id #{rack_id}. Response=#{response}",
@node
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_exists_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def parse_row(result_code)
op_count = @data_buffer.read_int16(20)

if op_count > 0
raise Aerospike::Exceptions::Parse.new('Received bins that were not requested!')
raise Aerospike::Exceptions::Parse.new('Received bins that were not requested!', @node)
end

skip_key(field_count)
Expand Down
9 changes: 7 additions & 2 deletions lib/aerospike/command/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def set_query(cluster, policy, statement, background, node_partitions)
if operations

unless background
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR)
raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR, nil, [@node])
end

operations.each do |operation|
Expand Down Expand Up @@ -685,6 +685,7 @@ def set_query(cluster, policy, statement, background, node_partitions)

def execute
iterations = 0
failed_nodes = []

# set timeout outside the loop
limit = Time.now + @policy.timeout
Expand All @@ -705,6 +706,7 @@ def execute
@node = get_node
@conn = @node.get_connection(@policy.timeout)
rescue => e
failed_nodes << @node if @node
if @node
# Socket connection error has occurred. Decrease health and retry.
@node.decrease_health
Expand All @@ -724,6 +726,7 @@ def execute
begin
write_buffer
rescue => e
failed_nodes << @node if @node
Aerospike.logger.error(e)

# All runtime exceptions are considered fatal. Do not retry.
Expand All @@ -738,6 +741,7 @@ def execute
begin
@conn.write(@data_buffer, @data_offset)
rescue => e
failed_nodes << @node if @node
# IO errors are considered temporary anomalies. Retry.
# Close socket to flush out possible garbage. Do not put back in pool.
@conn.close if @conn
Expand All @@ -753,6 +757,7 @@ def execute
begin
parse_result
rescue => e
failed_nodes << @node if @node
case e
# do not log the following exceptions
when Aerospike::Exceptions::ScanTerminated
Expand Down Expand Up @@ -783,7 +788,7 @@ def execute
end # while

# execution timeout
raise Aerospike::Exceptions::Timeout.new(limit, iterations)
raise Aerospike::Exceptions::Timeout.new(limit, iterations, failed_nodes)
end

protected
Expand Down
4 changes: 2 additions & 2 deletions lib/aerospike/command/delete_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def parse_result

if result_code == Aerospike::ResultCode::FILTERED_OUT
if @policy.fail_on_filtered_out
raise Aerospike::Exceptions::Aerospike.new(result_code)
raise Aerospike::Exceptions::Aerospike.new(result_code, nil, [@node])
end
@existed = true
return
end

raise Aerospike::Exceptions::Aerospike.new(result_code)
raise Aerospike::Exceptions::Aerospike.new(result_code, nil, [@node])
end

end # class
Expand Down
Loading
Loading