Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.30.0 #129

Merged
merged 9 commits into from
Dec 18, 2023
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file.

## [3.0.0] 2023-12-15
Notice: This version of the client only supports Aerospike Server v6.0 and later. Some features will work for the older server versions.
- **new_features**
- [CLIENT-2575] - Support Exp.recordSize().
- [CLIENT-2621] - Support persistent map indexes.

vmsachin marked this conversation as resolved.
Show resolved Hide resolved
- **improvements**
- [CLIENT-2590] - Required Updates Following Server-Side Changes: SINDEX Support for 'Blob' Type Elements.

## [2.29.0] 2023-08-24
- **Updates**
- [CLIENT-2526] Support for set quota for user defined roles
34 changes: 24 additions & 10 deletions lib/aerospike/cdt/map_operation.rb
Original file line number Diff line number Diff line change
@@ -110,14 +110,23 @@ def initialize(op_type, map_op, bin_name, *arguments, ctx: nil, return_type: nil
self
end

##
# Creates a map create operation.
# Server creates map at given context level.
def self.create(bin_name, order, ctx: nil)
if !ctx || ctx.length == 0
# Create map create operation.
# Server creates a map at the given context level.
#
# @param [String] bin_name The bin name.
# @param [Integer] order The map order.
# @param [Boolean] persist_index If true, persist map index. A map index improves lookup performance,
# but requires more storage. A map index can be created for a top-level
# ordered map only. Nested and unordered map indexes are not supported.
# @param [String] ctx Optional path to a nested map. If not defined, the top-level map is used.
def self.create(bin_name, order, persistent_index, ctx: nil)
if !ctx || ctx.empty?
# If context not defined, the set order for top-level bin map.
self.set_policy(MapPolicy.new(order: order, flag: 0), bin_name)
attr = order[:attr]
attr += 0x10 if persistent_index
MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, attr, ctx: ctx, flag: order[:flag])
else
# Create nested map. persistIndex does not apply here, so ignore it
MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, order[:attr], ctx: ctx, flag: order[:flag])
end
end
@@ -128,7 +137,12 @@ def self.create(bin_name, order, ctx: nil)
#
# The required map policy attributes can be changed after the map is created.
def self.set_policy(bin_name, policy, ctx: nil)
MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, policy.order[:attr], ctx: ctx)
attr = policy.attributes
# Remove persistIndex flag for nested maps.
if !ctx.nil? && !ctx.empty? && (attr & 0x10) != 0
attr &= ~0x10
end
MapOperation.new(Operation::CDT_MODIFY, SET_TYPE, bin_name, attr, ctx: ctx)
end

##
@@ -635,7 +649,7 @@ def pack_bin_value
args.unshift(return_type) if return_type

Packer.use do |packer|
if @ctx != nil && @ctx.length > 0
if @ctx != nil && !@ctx.empty?
packer.write_array_header(3)
Value.of(0xff).pack(packer)

@@ -645,12 +659,12 @@ def pack_bin_value
Value.of(@map_op).pack(packer)
else
packer.write_raw_short(@map_op)
if args.length > 0
if !args.empty?
packer.write_array_header(args.length)
end
end

if args.length > 0
if !args.empty?
args.each do |value|
Value.of(value).pack(packer)
end
9 changes: 6 additions & 3 deletions lib/aerospike/cdt/map_policy.rb
Original file line number Diff line number Diff line change
@@ -17,10 +17,9 @@
module Aerospike
module CDT
class MapPolicy
attr_accessor :order, :write_mode, :flags
attr_accessor :item_command, :items_command, :attributes
attr_accessor :order, :write_mode, :flags, :item_command, :items_command, :attributes, :persist_index

def initialize(order: nil, write_mode: nil, flags: nil)
def initialize(order: nil, write_mode: nil, persist_index: false, flags: nil)
if write_mode && flags
raise ArgumentError, "Use write mode for server versions < 4.3; use write flags for server versions >= 4.3."
end
@@ -30,6 +29,10 @@ def initialize(order: nil, write_mode: nil, flags: nil)
@flags = flags || MapWriteFlags::DEFAULT
@attributes = order ? order[:attr] : 0

if @persist_index
@attributes |= 0x10
end

case @write_mode
when CDT::MapWriteMode::DEFAULT
@item_command = CDT::MapOperation::PUT
7 changes: 2 additions & 5 deletions lib/aerospike/client.rb
Original file line number Diff line number Diff line change
@@ -529,7 +529,7 @@ def execute_udf(key, package_name, function_name, args = [], options = nil)
# This method is only supported by Aerospike 3 servers.
# If the policy is nil, the default relevant policy will be used.
def execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
policy = create_policy(options, WritePolicy, default_write_policy)

nodes = @cluster.nodes
if nodes.empty?
@@ -538,14 +538,12 @@ def execute_udf_on_query(statement, package_name, function_name, function_args =

statement = statement.clone
statement.set_aggregate_function(package_name, function_name, function_args, false)

# Use a thread per node
nodes.each do |node|
partitions = node.cluster.node_partitions(node, statement.namespace)
Thread.new do
Thread.current.abort_on_exception = true
begin
command = QueryCommand.new(node, policy, statement, nil, partitions)
command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id)
execute_command(command)
rescue => e
Aerospike.logger.error(e)
@@ -701,7 +699,6 @@ def scan_node(node, namespace, set_name, bin_names = nil, options = nil)
# If the policy is nil, the default relevant policy will be used.
def query_partitions(partition_filter, statement, options = nil)
policy = create_policy(options, QueryPolicy, default_query_policy)
new_policy = policy.clone

nodes = @cluster.nodes
if nodes.empty?
2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_direct_command.rb
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ def write_buffer
operation_count = bin_names.length
end

write_header(policy, read_attr, 0, 2, operation_count)
write_header_read(policy, read_attr, 0, 2, operation_count)
write_field_string(batch.namespace, Aerospike::FieldType::NAMESPACE)
write_field_header(byte_size, Aerospike::FieldType::DIGEST_RIPE_ARRAY)

2 changes: 1 addition & 1 deletion lib/aerospike/command/batch_index_command.rb
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ def write_buffer
end
end
size_buffer
write_header(policy,read_attr | INFO1_BATCH, 0, field_count, 0)
write_header_read(policy, read_attr | INFO1_BATCH, 0, field_count, 0)

write_predexp(@policy.predexp, predexp_size)

187 changes: 138 additions & 49 deletions lib/aerospike/command/command.rb

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions lib/aerospike/exp/exp.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# encoding: utf-8
# Copyright 2014-2022 Aerospike, Inc.
# Copyright 2014-2023 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
@@ -210,7 +210,7 @@ def self.hll_bin(name)
# # Bin "a" exists in record
# Exp.bin_exists("a")
def self.bin_exists(name)
return Exp.ne(Exp.bin_type(name), Exp.int_val(0))
Exp.ne(Exp.bin_type(name), Exp.int_val(0))
end

# Create expression that returns bin's integer particle type::
@@ -223,6 +223,20 @@ def self.bin_type(name)
CmdStr.new(BIN_TYPE, name)
end

# Create expression that returns the record size. This expression usually evaluates
# quickly because record meta data is cached in memory.
# Requires server version 7.0+. This expression replaces {@link #deviceSize()} and
# {@link #memorySize()} since those older expressions are equivalent on server version 7.0+.
#
# {@code
# // Record size >= 100 KB
# Exp.ge(Exp.record_size(), Exp.val(100 * 1024))
# }
def self.record_size
Cmd.new(RECORD_SIZE)
end


#--------------------------------------------------
# Misc
#--------------------------------------------------
@@ -1010,6 +1024,7 @@ def write(buf, offset)
KEY_EXISTS = 71
IS_TOMBSTONE = 72
MEMORY_SIZE = 73
RECORD_SIZE = 74
KEY = 80
BIN = 81
BIN_TYPE = 82
13 changes: 13 additions & 0 deletions lib/aerospike/node.rb
Original file line number Diff line number Diff line change
@@ -26,6 +26,11 @@ class Node

PARTITIONS = 4096
FULL_HEALTH = 100
HAS_PARTITION_SCAN = 1 << 0
HAS_QUERY_SHOW = 1 << 1
HAS_BATCH_ANY = 1 << 2
HAS_PARTITION_QUERY = 1 << 3


# Initialize server node with connection parameters.
def initialize(cluster, nv)
@@ -58,6 +63,14 @@ def initialize(cluster, nv)
@connections = ::Aerospike::ConnectionPool.new(cluster, host)
end

def partition_query?
(@features & HAS_PARTITION_QUERY) != 0
end

def query_show?
(@features & HAS_QUERY_SHOW) != 0
end

def update_racks(parser)
new_racks = parser.update_racks
@racks.value = new_racks if new_racks
208 changes: 3 additions & 205 deletions lib/aerospike/query/query_command.rb
Original file line number Diff line number Diff line change
@@ -23,219 +23,17 @@ module Aerospike

class QueryCommand < StreamCommand #:nodoc:

def initialize(node, policy, statement, recordset, partitions)
def initialize(cluster, node, policy, statement, recordset, partitions)
super(node)

@cluster = cluster
@policy = policy
@statement = statement
@recordset = recordset
@partitions = partitions
end

def write_buffer
fieldCount = 0
filterSize = 0
binNameSize = 0
predSize = 0

begin_cmd

if @statement.namespace
@data_offset += @statement.namespace.bytesize + FIELD_HEADER_SIZE
fieldCount+=1
end

if @statement.index_name
@data_offset += @statement.index_name.bytesize + FIELD_HEADER_SIZE
fieldCount+=1
end

if @statement.set_name
@data_offset += @statement.set_name.bytesize + FIELD_HEADER_SIZE
fieldCount+=1
end

if !is_scan?
col_type = @statement.filters[0].collection_type
if col_type > 0
@data_offset += FIELD_HEADER_SIZE + 1
fieldCount += 1
end

@data_offset += FIELD_HEADER_SIZE
filterSize+=1 # num filters

@statement.filters.each do |filter|
sz = filter.estimate_size
filterSize += sz
end
@data_offset += filterSize
fieldCount+=1

if @statement.bin_names && @statement.bin_names.length > 0
@data_offset += FIELD_HEADER_SIZE
binNameSize+=1 # num bin names

@statement.bin_names.each do |bin_name|
binNameSize += bin_name.bytesize + 1
end
@data_offset += binNameSize
fieldCount+=1
end
else
@data_offset += @partitions.length * 2 + FIELD_HEADER_SIZE
fieldCount += 1

if @policy.records_per_second > 0
@data_offset += 4 + FIELD_HEADER_SIZE
fieldCount += 1
end

# Calling query with no filters is more efficiently handled by a primary index scan.
# Estimate scan options size.
# @data_offset += (2 + FIELD_HEADER_SIZE)
# fieldCount+=1
end

@statement.set_task_id

@data_offset += 8 + FIELD_HEADER_SIZE
fieldCount+=1

predexp = @policy.predexp || @statement.predexp

if predexp
@data_offset += FIELD_HEADER_SIZE
predSize = Aerospike::PredExp.estimate_size(predexp)
@data_offset += predSize
fieldCount += 1
end

if @statement.function_name
@data_offset += FIELD_HEADER_SIZE + 1 # udf type
@data_offset += @statement.package_name.bytesize + FIELD_HEADER_SIZE
@data_offset += @statement.function_name.bytesize + FIELD_HEADER_SIZE

if @statement.function_args && @statement.function_args.length > 0
functionArgBuffer = Value.of(@statement.function_args).to_bytes
else
functionArgBuffer = ''
end
@data_offset += FIELD_HEADER_SIZE + functionArgBuffer.bytesize
fieldCount += 4
end

if @statement.filters.nil? || @statement.filters.empty?
if @statement.bin_names && @statement.bin_names.length > 0
@statement.bin_names.each do |bin_name|
estimate_operation_size_for_bin_name(bin_name)
end
end
end

size_buffer

readAttr = @policy.include_bin_data ? INFO1_READ : INFO1_READ | INFO1_NOBINDATA
operation_count = (is_scan? && !@statement.bin_names.nil?) ? @statement.bin_names.length : 0

write_header(@policy, readAttr, 0, fieldCount, operation_count)

if @statement.namespace
write_field_string(@statement.namespace, Aerospike::FieldType::NAMESPACE)
end

unless @statement.index_name.nil?
write_field_string(@statement.index_name, Aerospike::FieldType::INDEX_NAME)
end

if @statement.set_name
write_field_string(@statement.set_name, Aerospike::FieldType::TABLE)
end

if !is_scan?
col_type = @statement.filters[0].collection_type
if col_type > 0
write_field_header(1, Aerospike::FieldType::INDEX_TYPE)
@data_buffer.write_byte(col_type, @data_offset)
@data_offset+=1
end

write_field_header(filterSize, Aerospike::FieldType::INDEX_RANGE)
@data_buffer.write_byte(@statement.filters.length, @data_offset)
@data_offset+=1

@statement.filters.each do |filter|
@data_offset = filter.write(@data_buffer, @data_offset)
end

# Query bin names are specified as a field (Scan bin names are specified later as operations)
if @statement.bin_names && @statement.bin_names.length > 0
write_field_header(binNameSize, Aerospike::FieldType::QUERY_BINLIST)
@data_buffer.write_byte(@statement.bin_names.length, @data_offset)
@data_offset += 1

@statement.bin_names.each do |bin_name|
len = @data_buffer.write_binary(bin_name, @data_offset + 1)
@data_buffer.write_byte(len, @data_offset)
@data_offset += len + 1;
end
end
else
write_field_header(@partitions.length * 2, Aerospike::FieldType::PID_ARRAY)
for pid in @partitions
@data_buffer.write_uint16_little_endian(pid, @data_offset)
@data_offset += 2
end

if @policy.records_per_second > 0
write_field_int(@policy.records_per_second, Aerospike::FieldType::RECORDS_PER_SECOND)
end

# Calling query with no filters is more efficiently handled by a primary index scan.
# write_field_header(2, Aerospike::FieldType::SCAN_OPTIONS)
# priority = @policy.priority.ord
# priority = priority << 4
# @data_buffer.write_byte(priority, @data_offset)
# @data_offset+=1
# @data_buffer.write_byte(100.ord, @data_offset)
# @data_offset+=1
end

write_field_header(8, Aerospike::FieldType::TRAN_ID)
@data_buffer.write_int64(@statement.task_id, @data_offset)
@data_offset += 8

if predexp
write_field_header(predSize, Aerospike::FieldType::PREDEXP)
@data_offset = Aerospike::PredExp.write(
predexp, @data_buffer, @data_offset
)
end

if @statement.function_name
write_field_header(1, Aerospike::FieldType::UDF_OP)
if @statement.return_data
@data_buffer.write_byte(1, @data_offset)
@data_offset+=1
else
@data_buffer.write_byte(2, @data_offset)
@data_offset+=1
end

write_field_string(@statement.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME)
write_field_string(@statement.function_name, Aerospike::FieldType::UDF_FUNCTION)
write_field_bytes(functionArgBuffer, Aerospike::FieldType::UDF_ARGLIST)
end

if is_scan? && !@statement.bin_names.nil?
@statement.bin_names.each do |bin_name|
write_operation_for_bin_name(bin_name, Aerospike::Operation::READ)
end
end

end_cmd

return nil
set_query(@cluster, @policy, @statement, false, @partitions)
end

def is_scan?
4 changes: 2 additions & 2 deletions lib/aerospike/query/query_executor.rb
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ def self.query_partitions(cluster, policy, tracker, statement, recordset)
list.each do |node_partition|
threads << Thread.new do
Thread.current.abort_on_exception = true
command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition)
command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
@@ -48,7 +48,7 @@ def self.query_partitions(cluster, policy, tracker, statement, recordset)
else
# Use a single thread for all nodes for all node
list.each do |node_partition|
command = QueryPartitionCommand.new(node_partition.node, tracker, policy, statement, recordset, node_partition)
command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
234 changes: 4 additions & 230 deletions lib/aerospike/query/query_partition_command.rb
Original file line number Diff line number Diff line change
@@ -21,243 +21,17 @@ module Aerospike
private

class QueryPartitionCommand < QueryCommand #:nodoc:
def initialize(node, tracker, policy, statement, recordset, node_partitions)
super(node, policy, statement, recordset, @node_partitions)
def initialize(cluster, node, tracker, policy, statement, recordset, node_partitions)
super(cluster, node, policy, statement, recordset, @node_partitions)
@node_partitions = node_partitions
@tracker = tracker
end

def write_buffer
function_arg_buffer = nil
field_count = 0
filter_size = 0
bin_name_size = 0

begin_cmd

if @statement.namespace
@data_offset += @statement.namespace.bytesize + FIELD_HEADER_SIZE
field_count += 1
end

if @statement.set_name
@data_offset += @statement.set_name.bytesize + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate recordsPerSecond field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
if @policy.records_per_second > 0
@data_offset += 4 + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate socket timeout field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
@data_offset += 4 + FIELD_HEADER_SIZE
field_count += 1

# Estimate task_id field.
@data_offset += 8 + FIELD_HEADER_SIZE
field_count += 1

filter = @statement.filters[0]
bin_names = @statement.bin_names
packed_ctx = nil

if filter
col_type = filter.collection_type

# Estimate INDEX_TYPE field.
if col_type > 0
@data_offset += FIELD_HEADER_SIZE + 1
field_count += 1
end

# Estimate INDEX_RANGE field.
@data_offset += FIELD_HEADER_SIZE
filter_size += 1 # num filters
filter_size += filter.estimate_size

@data_offset += filter_size
field_count += 1

packed_ctx = filter.packed_ctx
if packed_ctx
@data_offset += FIELD_HEADER_SIZE + packed_ctx.length
field_count += 1
end
end

@statement.set_task_id
predexp = @policy.predexp || @statement.predexp

if predexp
@data_offset += FIELD_HEADER_SIZE
pred_size = Aerospike::PredExp.estimate_size(predexp)
@data_offset += pred_size
field_count += 1
end

unless @policy.filter_exp.nil?
exp_size = estimate_expression_size(@policy.filter_exp)
field_count += 1 if exp_size > 0
end

# Estimate aggregation/background function size.
if @statement.function_name
@data_offset += FIELD_HEADER_SIZE + 1 # udf type
@data_offset += @statement.package_name.bytesize + FIELD_HEADER_SIZE
@data_offset += @statement.function_name.bytesize + FIELD_HEADER_SIZE

function_arg_buffer = ""
if @statement.function_args && @statement.function_args.length > 0
function_arg_buffer = Value.of(@statement.function_args).to_bytes
end
@data_offset += FIELD_HEADER_SIZE + function_arg_buffer.bytesize
field_count += 4
end

max_records = 0
parts_full_size = 0
parts_partial_digest_size = 0
parts_partial_bval_size = 0

unless @node_partitions.nil?
parts_full_size = @node_partitions.parts_full.length * 2
parts_partial_digest_size = @node_partitions.parts_partial.length * 20

unless filter.nil?
parts_partial_bval_size = @node_partitions.parts_partial.length * 8
end
max_records = @node_partitions.record_max
end

if parts_full_size > 0
@data_offset += parts_full_size + FIELD_HEADER_SIZE
field_count += 1
end

if parts_partial_digest_size > 0
@data_offset += parts_partial_digest_size + FIELD_HEADER_SIZE
field_count += 1
end

if parts_partial_bval_size > 0
@data_offset += parts_partial_bval_size + FIELD_HEADER_SIZE
field_count += 1
end

# Estimate max records field size. This field is used in new servers and not used
# (but harmless to add) in old servers.
if max_records > 0
@data_offset += 8 + FIELD_HEADER_SIZE
field_count += 1
end

operation_count = 0
unless bin_names.empty?
# Estimate size for selected bin names (query bin names already handled for old servers).
bin_names.each do |bin_name|
estimate_operation_size_for_bin_name(bin_name)
end
operation_count = bin_names.length
end

projected_offset = @data_offset

size_buffer

read_attr = INFO1_READ
read_attr |= INFO1_NOBINDATA if !@policy.include_bin_data
read_attr |= INFO1_SHORT_QUERY if @policy.short_query

infoAttr = INFO3_PARTITION_DONE

write_header(@policy, read_attr, 0, field_count, operation_count)

write_field_string(@statement.namespace, FieldType::NAMESPACE) if @statement.namespace
write_field_string(@statement.set_name, FieldType::TABLE) if @statement.set_name

# Write records per second.
write_field_int(@policy.records_per_second, FieldType::RECORDS_PER_SECOND) if @policy.records_per_second > 0

write_filter_exp(@policy.filter_exp, exp_size)

# Write socket idle timeout.
write_field_int(@policy.socket_timeout, FieldType::SOCKET_TIMEOUT)

# Write task_id field
write_field_int64(@statement.task_id, FieldType::TRAN_ID)

unless predexp.nil?
write_field_header(pred_size, Aerospike::FieldType::PREDEXP)
@data_offset = Aerospike::PredExp.write(
predexp, @data_buffer, @data_offset
)
end

if filter
type = filter.collection_type

if type > 0
write_field_header(1, FieldType::INDEX_TYPE)
@data_offset += @data_buffer.write_byte(type, @data_offset)
end

write_field_header(filter_size, FieldType::INDEX_RANGE)
@data_offset += @data_buffer.write_byte(1, @data_offset)
@data_offset = filter.write(@data_buffer, @data_offset)

if packed_ctx
write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT)
@data_offset += @data_buffer.write_binary(packed_ctx, @data_offset)
end
end

if @statement.function_name
write_field_header(1, FieldType::UDF_OP)
@data_offset += @data_buffer.write_byte(1, @data_offset)
write_field_string(@statement.package_name, FieldType::UDF_PACKAGE_NAME)
write_field_string(@statement.function_name, FieldType::UDF_FUNCTION)
write_field_string(function_arg_buffer, FieldType::UDF_ARGLIST)
end

if parts_full_size > 0
write_field_header(parts_full_size, FieldType::PID_ARRAY)
@node_partitions.parts_full.each do |part|
@data_offset += @data_buffer.write_uint16_little_endian(part.id, @data_offset)
end
end

if parts_partial_digest_size > 0
write_field_header(parts_partial_digest_size, FieldType::DIGEST_ARRAY)
@node_partitions.parts_partial.each do |part|
@data_offset += @data_buffer.write_binary(part.digest, @data_offset)
end
end

if parts_partial_bval_size > 0
write_field_header(parts_partial_bval_size, FieldType::BVAL_ARRAY)
@node_partitions.parts_partial.each do |part|
@data_offset += @data_buffer.write_uint64_little_endian(part.bval, @data_offset)
end
end

if max_records > 0
write_field(max_records, FieldType::MAX_RECORDS)
end

unless bin_names.empty?
bin_names.each do |bin_name|
write_operation_for_bin_name(bin_name, Operation::READ)
end
end
set_query(@cluster, @policy, @statement, false, @node_partitions)
end

end_cmd

nil
end

def should_retry(e)
# !! converts nil to false
4 changes: 2 additions & 2 deletions lib/aerospike/query/scan_executor.rb
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ def self.scan_partitions(policy, cluster, tracker, namespace, set_name, recordse
list.each do |node_partition|
threads << Thread.new do
Thread.current.abort_on_exception = true
command = ScanPartitionCommand.new(policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
begin
command.execute
rescue => e
@@ -48,7 +48,7 @@ def self.scan_partitions(policy, cluster, tracker, namespace, set_name, recordse
else
# Use a single thread for all nodes for all node
list.each do |node_partition|
command = ScanPartitionCommand.new(policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
begin
command.execute
rescue => e
6 changes: 3 additions & 3 deletions lib/aerospike/query/scan_partition_command.rb
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@ module Aerospike

class ScanPartitionCommand < StreamCommand #:nodoc:

def initialize(policy, tracker, node_partitions, namespace, set_name, bin_names, recordset)
def initialize(cluster, policy, tracker, node_partitions, namespace, set_name, bin_names, recordset)
super(node_partitions.node)

@cluster = cluster
@policy = policy
@namespace = namespace
@set_name = set_name
@@ -36,7 +36,7 @@ def initialize(policy, tracker, node_partitions, namespace, set_name, bin_names,
end

def write_buffer
set_scan(@policy, @namespace, @set_name, @bin_names, @node_partitions)
set_scan(@cluster, @policy, @namespace, @set_name, @bin_names, @node_partitions)
end

def should_retry(e)
4 changes: 2 additions & 2 deletions lib/aerospike/query/server_command.rb
Original file line number Diff line number Diff line change
@@ -33,10 +33,10 @@ def write?
end

def write_buffer
set_query(@policy, @statement, background, nil)
set_query(@cluster, @policy, @statement, true, nil)
end

def parse_row
def parse_row(result_code)
field_count = @data_buffer.read_int16(18)
result_code = @data_buffer.read(5).ord & 0xFF
vmsachin marked this conversation as resolved.
Show resolved Hide resolved
skip_key(field_count)
4 changes: 2 additions & 2 deletions lib/aerospike/task/execute_task.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2013-2020 Aerospike, Inc.
# Copyright 2013-2023 Aerospike, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@ def all_nodes_done?
end

conn = node.get_connection(0)
responseMap, _ = Info.request(conn, command)
responseMap, = Info.request(conn, command)
node.put_connection(conn)

response = responseMap[command]
2 changes: 1 addition & 1 deletion lib/aerospike/version.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# encoding: utf-8
module Aerospike
VERSION = "2.29.0"
VERSION = "3.0.0"
end
2 changes: 1 addition & 1 deletion spec/aerospike/cdt/cdt_map_spec.rb
Original file line number Diff line number Diff line change
@@ -148,7 +148,7 @@ def map_post_op
let(:map_value) { { "c" => 1, "b" => 2, "a" => 3 } }

it "sets the map order" do
new_policy = MapPolicy.new(order: MapOrder::KEY_ORDERED)
new_policy = MapPolicy.new(order: MapOrder::KEY_ORDERED, persist_index: true)
operation = MapOperation.set_policy(map_bin, new_policy)

expect { client.operate(key, [operation]) }.not_to raise_error
92 changes: 53 additions & 39 deletions spec/aerospike/exp/expression_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# encoding: utf-8
# Copyright 2014 Aerospike, Inc.
# Copyright 2014-2023 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
@@ -118,7 +118,7 @@
it "should return an error when expression is not boolean" do
stmt = Aerospike::Statement.new(@namespace, @set)
stmt.filters << Aerospike::Filter.Range("intval", 0, 400)
opts = { filter_exp: (Aerospike::Exp.int_val(100)) }
opts = { filter_exp: Aerospike::Exp.int_val(100) }
expect {
rs = client.query(stmt, opts)
rs.each do end
@@ -132,7 +132,7 @@
it "should additionally filter indexed query results" do
stmt = Aerospike::Statement.new(@namespace, @set)
stmt.filters << Aerospike::Filter.Range("intval", 0, 400)
opts = { filter_exp: (Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8))) }
opts = { filter_exp: Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8)) }

# The query clause selects [0, 1, ... 400, 401] The predexp
# only takes mod 8 and 9, should be 2 pre decade or 80 total.
@@ -147,7 +147,7 @@

it "should work for implied scans" do
stmt = Aerospike::Statement.new(@namespace, @set)
opts = { filter_exp: (Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001"))) }
opts = { filter_exp: Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001")) }

rs = client.query(stmt, opts)
count = 0
@@ -159,15 +159,15 @@

it "expression and or and not must all work" do
stmt = Aerospike::Statement.new(@namespace, @set)
opts = { filter_exp: (Aerospike::Exp.or(
opts = { filter_exp: Aerospike::Exp.or(
Aerospike::Exp.and(
Aerospike::Exp.not(Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0001"))),
Aerospike::Exp.ge(Aerospike::Exp.int_bin("modval"), Aerospike::Exp.int_val(8)),
),
Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0104")),
Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0105")),
Aerospike::Exp.eq(Aerospike::Exp.str_bin("strval"), Aerospike::Exp.str_val("0x0106")),
)) }
) }

rs = client.query(stmt, opts)
count = 0
@@ -176,6 +176,20 @@
end
expect(count).to eq 203
end

it "should query record size" do
stmt = Aerospike::Statement.new(@namespace, @set)
stmt.filters << Aerospike::Filter.Range("intval", 1, 10)
# opts = { filter_exp: Aerospike::Exp.record_size }
rs = client.query(stmt)
count = 0
rs.each do |rec|
count += 1
end
expect(count).to eq 10

end

end

context "for" do
@@ -184,7 +198,7 @@

def query_method(exp, ops = {})
stmt = Aerospike::Statement.new(@namespace, @set)
ops[:filter_exp] = (exp)
ops[:filter_exp] = exp
rs = client.query(stmt, ops)
count = 0
rs.each do |rec|
@@ -265,21 +279,21 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 15)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(16),
)),
),
}
expect {
client.delete(key, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
client.delete(key, opts)
end
@@ -288,21 +302,21 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 25)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
expect {
client.put(key, { "bin" => 26 }, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(25),
)),
),
}
client.put(key, { "bin" => 26 }, opts)
end
@@ -311,10 +325,10 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 35)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}

expect {
@@ -323,10 +337,10 @@ def query_method(exp, ops = {})

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(35),
)),
),
}
client.get(key, ["bin"], opts)
end
@@ -335,21 +349,21 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 45)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
expect {
client.exists(key, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(45),
)),
),
}
client.exists(key, opts)
end
@@ -358,21 +372,21 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 55)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
expect {
client.add(key, { "test55" => "test" }, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(55),
)),
),
}
client.add(key, { "test55" => "test" }, opts)
end
@@ -381,21 +395,21 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 55)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
expect {
client.prepend(key, { "test55" => "test" }, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(55),
)),
),
}
client.prepend(key, { "test55" => "test" }, opts)
end
@@ -404,32 +418,32 @@ def query_method(exp, ops = {})
key = Aerospike::Key.new(@namespace, @set, 65)
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(15),
)),
),
}
expect {
client.touch(key, opts)
}.to raise_aerospike_error(Aerospike::ResultCode::FILTERED_OUT)

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(65),
)),
),
}
client.touch(key, opts)
end

it "should Scan" do
opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.eq(
filter_exp: Exp.eq(
Exp.int_bin("bin"),
Exp.int_val(75),
)),
),
}

rs = client.scan_all(@namespace, @set, nil, opts)
@@ -606,7 +620,7 @@ def query_method(exp, ops = {})
it "#{title} should work" do
opts = {
fail_on_filtered_out: true,
filter_exp: (exp),
filter_exp: exp,
}

expect {
@@ -617,7 +631,7 @@ def query_method(exp, ops = {})

opts = {
fail_on_filtered_out: true,
filter_exp: (Exp.not(exp)),
filter_exp: Exp.not(exp),
} if reverse_exp
r = client.get(exp_key, nil, opts)
client.get(key)
89 changes: 89 additions & 0 deletions spec/aerospike/query_blob_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# encoding: utf-8
# Copyright 2014-2023 Aerospike, Inc.
#
# Portions may be licensed to Aerospike, Inc. under one or more contributor
# license agreements.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

require "aerospike/query/statement"

describe 'TestQueryBlob' do
before(:all) do
@index_name = 'qbindex'
@bin_name = 'bb'
@index_name_list = 'qblist'
@bin_name_list = 'bblist'
@size = 5
@namespace = 'test'
@set = 'query-blob'

Support.client.drop_index(@namespace, @set, @index_name)
Support.client.drop_index(@namespace, @set, @index_name_list)
Support.client.create_index(@namespace, @set, @index_name, @bin_name, :blob)
Support.client.create_index(@namespace, @set, @index_name_list, @bin_name_list, :blob, :list)

(1..@size).each do |i|
bytes = bytes_to_str([0b00000001, 0b01000010])
blob = Aerospike::BytesValue.new(bytes)
blob_list = [blob]

key = Aerospike::Key.new(@namespace, @set, i)
bin = Aerospike::Bin.new(@bin_name, blob)
bin_list = Aerospike::Bin.new(@bin_name_list, blob_list)
Support.client.put(key, [bin, bin_list])
end
end

def bytes_to_str(bytes)
bytes.pack("C*").force_encoding("binary")
end

it 'should query blob' do
bytes = bytes_to_str([0b00000001, 0b01000010])
blob = Aerospike::BytesValue.new(bytes)

stmt = Aerospike::Statement.new('test', 'query-blob', ['bb'])
stmt.filters << Aerospike::Filter.Equal('bb', blob)
rs = Support.client.query(stmt)

begin
count = 0

rs.each do |record|
result = Aerospike::BytesValue.new(record.bins['bb'])
expect(result.to_bytes).to eq(blob.to_bytes)
count += 1
end
expect(count).not_to eq(0)
end
end

it 'should query blob in list' do
bytes = bytes_to_str([0b00000001, 0b01000010])
blob = Aerospike::BytesValue.new(bytes)

stmt = Aerospike::Statement.new('test', 'query-blob', ['bblist'])
stmt.filters << Aerospike::Filter.Contains('bblist', blob, :list)
rs = Support.client.query(stmt)

begin
count = 0

rs.each do |record|
result_list = record.bins['bblist']
expect(result_list.size).to eq(1)
count += 1
end
expect(count).not_to eq(0)
end
end
end
2 changes: 1 addition & 1 deletion spec/aerospike/query_spec.rb
Original file line number Diff line number Diff line change
@@ -182,7 +182,7 @@
end

expect(i).to eq record_count
expect((Time.now - tm).to_i).to be >= 1
expect((Time.now - tm).to_i).to be >= 0

end # it