Skip to content

Commit

Permalink
Merge pull request #126 from aerospike/CLIENT-2575
Browse files Browse the repository at this point in the history
CLIENT-2575 Supports new queries and record size filter
  • Loading branch information
vmsachin authored Dec 15, 2023
2 parents b3f25d9 + 7928e24 commit f6d8e31
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 546 deletions.
2 changes: 1 addition & 1 deletion lib/aerospike/cdt/map_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def initialize(op_type, map_op, bin_name, *arguments, ctx: nil, return_type: nil
def self.create(bin_name, order, ctx: nil)
if !ctx || ctx.length == 0
# If context not defined, the set order for top-level bin map.
self.set_policy(MapPolicy.new(order: order, flag: 0), bin_name)
self.set_policy(MapPolicy.new(order: order, flags: 0), bin_name)
else
MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, order[:attr], ctx: ctx, flag: order[:flag])
end
Expand Down
7 changes: 2 additions & 5 deletions lib/aerospike/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def execute_udf(key, package_name, function_name, args = [], options = nil)
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, the default relevant policy will be used.
def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
policy = create_policy(options, WritePolicy, default_write_policy)

nodes = @cluster.nodes
if nodes.empty?
Expand All @@ -538,14 +538,12 @@ def execute_udf_on_query(statement, package_name, function_name, function_args =

statement = statement.clone
statement.set_aggregate_function(package_name, function_name, function_args, false)

# Use a thread per node
nodes.each do |node|
partitions = node.cluster.node_partitions(node, statement.namespace)
Thread.new do
Thread.current.abort_on_exception = true
begin
command = QueryCommand.new(node, policy, statement, nil, partitions)
command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id)
execute_command(command)
rescue => e
Aerospike.logger.error(e)
Expand Down Expand Up @@ -701,7 +699,6 @@ def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
# If the policy is nil, the default relevant policy will be used.
def query_partitions(partition_filter, statement, options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
new_policy = policy.clone

nodes = @cluster.nodes
if nodes.empty?
Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_direct_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def write_buffer
operation_count = bin_names.length
end

write_header(policy, read_attr, 0, 2, operation_count)
write_header_read(policy, read_attr, 0, 2, operation_count)
write_field_string(batch.namespace, Aerospike::FieldType::NAMESPACE)
write_field_header(byte_size, Aerospike::FieldType::DIGEST_RIPE_ARRAY)

Expand Down
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def write_buffer
end
end
size_buffer
write_header(policy,read_attr | INFO1_BATCH, 0, field_count, 0)
write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0)

write_predexp(@policy.predexp, predexp_size)

Expand Down
Loading

0 comments on commit f6d8e31

Please sign in to comment.