From fd936d53330d686bf20e50823f55c956e287eead Mon Sep 17 00:00:00 2001 From: Khosrow Afroozeh Date: Fri, 9 Aug 2024 17:31:40 +0200 Subject: [PATCH] [CLIENT_1731] Support Batch Operations --- lib/aerospike.rb | 13 + lib/aerospike/batch_attr.rb | 292 ++++++++++++++++++ lib/aerospike/batch_delete.rb | 48 +++ lib/aerospike/batch_read.rb | 97 ++++++ lib/aerospike/batch_record.rb | 83 +++++ lib/aerospike/batch_results.rb | 38 +++ lib/aerospike/batch_udf.rb | 76 +++++ lib/aerospike/batch_write.rb | 79 +++++ lib/aerospike/client.rb | 33 ++ lib/aerospike/cluster.rb | 96 +++--- lib/aerospike/command/batch_index_node.rb | 7 +- .../command/batch_operate_command.rb | 151 +++++++++ lib/aerospike/command/batch_operate_node.rb | 51 +++ lib/aerospike/command/command.rb | 93 +++++- lib/aerospike/node.rb | 10 +- lib/aerospike/operation.rb | 37 +++ lib/aerospike/policy/batch_delete_policy.rb | 71 +++++ lib/aerospike/policy/batch_policy.rb | 54 +++- lib/aerospike/policy/batch_read_policy.rb | 38 +++ lib/aerospike/policy/batch_udf_policy.rb | 75 +++++ lib/aerospike/policy/batch_write_policy.rb | 105 +++++++ lib/aerospike/utils/buffer.rb | 30 +- spec/aerospike/batch_operate_spec.rb | 185 +++++++++++ 23 files changed, 1689 insertions(+), 73 deletions(-) create mode 100644 lib/aerospike/batch_attr.rb create mode 100644 lib/aerospike/batch_delete.rb create mode 100644 lib/aerospike/batch_read.rb create mode 100644 lib/aerospike/batch_record.rb create mode 100644 lib/aerospike/batch_results.rb create mode 100644 lib/aerospike/batch_udf.rb create mode 100644 lib/aerospike/batch_write.rb create mode 100644 lib/aerospike/command/batch_operate_command.rb create mode 100644 lib/aerospike/command/batch_operate_node.rb create mode 100644 lib/aerospike/policy/batch_delete_policy.rb create mode 100644 lib/aerospike/policy/batch_read_policy.rb create mode 100644 lib/aerospike/policy/batch_udf_policy.rb create mode 100644 lib/aerospike/policy/batch_write_policy.rb create mode 100644 spec/aerospike/batch_operate_spec.rb diff --git a/lib/aerospike.rb b/lib/aerospike.rb index b1323ce4..b6cc426c 100644 --- a/lib/aerospike.rb +++ b/lib/aerospike.rb @@ -44,6 +44,7 @@ require "aerospike/value/value" require "aerospike/command/single_command" require "aerospike/command/batch_index_node" +require "aerospike/command/batch_operate_node" require "aerospike/command/field_type" require "aerospike/command/command" require "aerospike/command/execute_command" @@ -53,6 +54,7 @@ require "aerospike/command/exists_command" require "aerospike/command/multi_command" require "aerospike/command/batch_index_command" +require "aerospike/command/batch_operate_command" require "aerospike/command/batch_index_exists_command" require "aerospike/command/read_header_command" require "aerospike/command/touch_command" @@ -94,6 +96,10 @@ require "aerospike/policy/generation_policy" require "aerospike/policy/policy" require "aerospike/policy/batch_policy" +require "aerospike/policy/batch_delete_policy" +require "aerospike/policy/batch_read_policy" +require "aerospike/policy/batch_udf_policy" +require "aerospike/policy/batch_write_policy" require "aerospike/policy/write_policy" require "aerospike/policy/scan_policy" require "aerospike/policy/query_policy" @@ -102,6 +108,13 @@ require "aerospike/policy/admin_policy" require "aerospike/policy/auth_mode" +require "aerospike/batch_record" +require "aerospike/batch_attr" +require "aerospike/batch_read" +require "aerospike/batch_write" +require "aerospike/batch_delete" +require "aerospike/batch_udf" + require "aerospike/socket/base" require "aerospike/socket/ssl" require "aerospike/socket/tcp" diff --git a/lib/aerospike/batch_attr.rb b/lib/aerospike/batch_attr.rb new file mode 100644 index 00000000..c6c8f449 --- /dev/null +++ b/lib/aerospike/batch_attr.rb @@ -0,0 +1,292 @@ +# frozen_string_literal: true + +# Copyright 2014-2020 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + class BatchAttr + + attr_reader :filter_exp, :read_attr, :write_attr, :info_attr, :expiration, :generation, :has_write, :send_key + + def initialize(ops = nil, opt = {}) + rp = create_policy(opt, BatchPolicy, nil) + wp = create_policy(opt, BatchWritePolicy, nil) + + read_all_bins = false + read_header = false + has_read = false + has_write_op = false + + ops&.each do |op| + case op.op_type + when Operation::BIT_READ, Operation::EXP_READ, Operation::HLL_READ, Operation::CDT_READ, Operation::READ # Read all bins if no bin is specified. + read_all_bins = op.bin_name.nil? + has_read = true + + when Operation::READ_HEADER + read_header = true + has_read = true + + else + has_write_op = true + end + end + + if has_write_op + set_batch_write(wp) + + if has_read + @read_attr |= Aerospike::INFO1_READ + + if read_all_bins + @read_attr |= Aerospike::INFO1_GET_ALL + elsif read_header + @read_attr |= Aerospike::INFO1_NOBINDATA + end + end + else + set_batch_read(rp) + + if read_all_bins + @read_attr |= Aerospike::INFO1_GET_ALL + elsif read_header + @read_attr |= Aerospike::INFO1_NOBINDATA + end + end + end + + def set_read(rp) + @filter_exp = nil + @read_attr = Aerospike::INFO1_READ + + @write_attr = 0 + @info_attr = 0 + + @expiration = 0 + @generation = 0 + @has_write = false + @send_key = false + end + + def set_batch_read(rp) + @filter_exp = rp.filter_exp + @read_attr = Aerospike::INFO1_READ + + @write_attr = 0 + @info_attr = 0 + + @expiration = 0 + @generation = 0 + @has_write = false + @send_key = false + end + + def adjust_read(ops) + read_all_bins = false + read_header = false + + ops.each do |op| + case op.op_type + when Operation::BIT_READ, Operation::EXP_READ, Operation::HLL_READ, Operation::CDT_READ, Operation::READ # Read all bins if no bin is specified. + read_all_bins = op.bin_name.nil? + when Operation::READ_HEADER + read_header = true + end + end + + if read_all_bins + @read_attr |= Aerospike::INFO1_GET_ALL + elsif read_header + @read_attr |= Aerospike::INFO1_NOBINDATA + end + end + + def adjust_read_all_bins(read_all_bins) + @read_attr |= read_all_bins ? Aerospike::INFO1_GET_ALL : Aerospike::INFO1_NOBINDATA + end + + def set_write(wp) + @filter_exp = nil + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE | Aerospike::INFO2_RESPOND_ALL_OPS + @info_attr = 0 + @expiration = 0 + @generation = 0 + @has_write = true + @send_key = wp.send_key + end + + def set_batch_write(wp) + @filter_exp = wp.filter_exp + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE | Aerospike::INFO2_RESPOND_ALL_OPS + @info_attr = 0 + @expiration = wp.expiration + @has_write = true + @send_key = wp.send_key + + case wp.generation_policy + when GenerationPolicy::NONE + @generation = 0 + when GenerationPolicy::EXPECT_GEN_EQUAL + @generation = wp.generation + @write_attr |= Aerospike::INFO2_GENERATION + when GenerationPolicy::EXPECT_GEN_GT + @generation = wp.generation + @write_attr |= Aerospike::INFO2_GENERATION_GT + else + @generation = 0 + end + + case wp.record_exists_action + when RecordExistsAction::UPDATE + # NOOP + when RecordExistsAction::UPDATE_ONLY + @info_attr |= Aerospike::INFO3_UPDATE_ONLY + when RecordExistsAction::REPLACE + @info_attr |= Aerospike::INFO3_CREATE_OR_REPLACE + when RecordExistsAction::REPLACE_ONLY + @info_attr |= Aerospike::INFO3_REPLACE_ONLY + when RecordExistsAction::CREATE_ONLY + @write_attr |= Aerospike::INFO2_CREATE_ONLY + end + + if wp.durable_delete + @write_attr |= Aerospike::INFO2_DURABLE_DELETE + end + + if wp.commit_level == CommitLevel::COMMIT_MASTER + @info_attr |= Aerospike::INFO3_COMMIT_MASTER + end + end + + def adjust_write(ops) + read_all_bins = false + read_header = false + has_read = false + + ops.each do |op| + case op.op_type + when Operation::BIT_READ, Operation::EXP_READ, Operation::HLL_READ, Operation::CDT_READ, Operation::READ # Read all bins if no bin is specified. + read_all_bins = op.bin_name.nil? + has_read = true + + when Operation::READ_HEADER + read_header = true + has_read = true + + end + end + + if has_read + @read_attr |= Aerospike::INFO1_READ + + if read_all_bins + @read_attr |= Aerospike::INFO1_GET_ALL + elsif read_header + @read_attr |= Aerospike::INFO1_NOBINDATA + end + end + end + + def set_udf(up) + @filter_exp = nil + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE + @info_attr = 0 + @expiration = 0 + @generation = 0 + @has_write = true + @send_key = up.send_key + end + + def set_batch_udf(up) + @filter_exp = up.filter_exp + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE + @info_attr = 0 + @expiration = up.expiration + @generation = 0 + @has_write = true + @send_key = up.send_key + + if up.durable_delete + @write_attr |= Aerospike::INFO2_DURABLE_DELETE + end + + if up.commit_level == CommitLevel::COMMIT_MASTER + @info_attr |= Aerospike::INFO3_COMMIT_MASTER + end + end + + def set_delete(dp) + @filter_exp = nil + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE | Aerospike::INFO2_RESPOND_ALL_OPS | Aerospike::INFO2_DELETE + @info_attr = 0 + @expiration = 0 + @generation = 0 + @has_write = true + @send_key = dp.send_key + end + + def set_batch_delete(dp) + @filter_exp = dp.filter_exp + @read_attr = 0 + @write_attr = Aerospike::INFO2_WRITE | Aerospike::INFO2_RESPOND_ALL_OPS | Aerospike::INFO2_DELETE + @info_attr = 0 + @expiration = 0 + @has_write = true + @send_key = dp.send_key + + case dp.generation_policy + when GenerationPolicy::NONE + @generation = 0 + when GenerationPolicy::EXPECT_GEN_EQUAL + @generation = dp.generation + @write_attr |= Aerospike::INFO2_GENERATION + when GenerationPolicy::EXPECT_GEN_GT + @generation = dp.generation + @write_attr |= Aerospike::INFO2_GENERATION_GT + else + @generation = 0 + end + + if dp.durable_delete + @write_attr |= Aerospike::INFO2_DURABLE_DELETE + end + + if dp.commit_level == CommitLevel::COMMIT_MASTER + @info_attr |= Aerospike::INFO3_COMMIT_MASTER + end + end + + private + + def create_policy(policy, policy_klass, default_policy = nil) + case policy + when nil + default_policy || policy_klass.new + when policy_klass + policy + when Hash + policy_klass.new(policy) + else + raise TypeError, "policy should be a #{policy_klass.name} instance or a Hash" + end + end + end +end diff --git a/lib/aerospike/batch_delete.rb b/lib/aerospike/batch_delete.rb new file mode 100644 index 00000000..f67f1824 --- /dev/null +++ b/lib/aerospike/batch_delete.rb @@ -0,0 +1,48 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + # Batch delete operation. + class BatchDelete < BatchRecord + # Optional delete policy. + attr_accessor :policy + + # Initialize policy and key. + def initialize(key, opt = {}) + super(key, has_write: true) + @policy = BatchRecord.create_policy(opt, BatchDeletePolicy, DEFAULT_BATCH_DELETE_POLICY) + end + + def ==(other) + other && other.instance_of?(self.class) && @policy == other.policy + end + + DEFAULT_BATCH_DELETE_POLICY = BatchDeletePolicy.new + + # Return wire protocol size. For internal use only. + def size + size = 6 # gen(2) + exp(4) = 6 + + size += @policy&.filter_exp&.size if @policy&.filter_exp + if @policy&.send_key + size += @key.user_key.estimate_size + Aerospike::FIELD_HEADER_SIZE + 1 + end + + size + end + end +end \ No newline at end of file diff --git a/lib/aerospike/batch_read.rb b/lib/aerospike/batch_read.rb new file mode 100644 index 00000000..9a62cb7a --- /dev/null +++ b/lib/aerospike/batch_read.rb @@ -0,0 +1,97 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +# Batch key and read only operations with default policy. +# Used in batch read commands where different bins are needed for each key. + +module Aerospike + + class BatchRead < BatchRecord + + # Optional read policy. + attr_accessor :policy + + # Bins to retrieve for this key. bin_names are mutually exclusive with + # {@link com.aerospike.client.BatchRead#ops}. + attr_accessor :bin_names + + # Optional operations for this key. ops are mutually exclusive with + # {@link com.aerospike.client.BatchRead#bin_names}. A bin_name can be emulated with + # {@link com.aerospike.client.Operation#get(String)} + attr_accessor :ops + + # If true, ignore bin_names and read all bins. + # If false and bin_names are set, read specified bin_names. + # If false and bin_names are not set, read record header (generation, expiration) only. + attr_accessor :read_all_bins + + # Initialize batch key and bins to retrieve. + def self.read_bins(key, bin_names, opt = {}) + br = BatchRead.new(key) + br.policy = BatchRecord.create_policy(opt, BatchReadPolicy, DEFAULT_BATCH_READ_POLICY) + br.bin_names = bin_names + br.read_all_bins = false + br + end + + # Initialize batch key and read_all_bins indicator. + def self.read_all_bins(key, opt = {}) + br = BatchRead.new(key) + br.policy = create_policy(opt, BatchReadPolicy, DEFAULT_BATCH_READ_POLICY) + br.read_all_bins = true + br + end + + # Initialize batch key and read operations. + def self.ops(key, ops, opt = {}) + br = BatchRead.new(key) + br.policy = create_policy(opt, BatchReadPolicy, DEFAULT_BATCH_READ_POLICY) + br.ops = ops + br.read_all_bins = false + br + end + + # Optimized reference equality check to determine batch wire protocol repeat flag. + # For internal use only. + def ==(other) + other && other.instance_of?(self.class) && + @bin_names.sort == other.bin_names.sort && @ops.sort == other.ops.sort && + @policy == other.policy && @read_all_bins == other.read_all_bins + end + + DEFAULT_BATCH_READ_POLICY = BatchReadPolicy.new + + # Return wire protocol size. For internal use only. + def size + size = 0 + size += @policy&.filter_exp&.size if @policy&.filter_exp + + @bin_names&.each do |bin_name| + size += bin_name.bytesize + Aerospike::OPERATION_HEADER_SIZE + end + + @ops&.each do |op| + if op.is_write? + raise AerospikeException.new(ResultCode::PARAMETER_ERROR, "Write operations not allowed in batch read") + end + size += op.bin_name.bytesize + Aerospike::OPERATION_HEADER_SIZE + size += op.value.estimate_size + end + + size + end + end +end diff --git a/lib/aerospike/batch_record.rb b/lib/aerospike/batch_record.rb new file mode 100644 index 00000000..898aa7ad --- /dev/null +++ b/lib/aerospike/batch_record.rb @@ -0,0 +1,83 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + # Batch key and record result. + class BatchRecord + # Key. + attr_reader :key + + # Record result after batch command has completed. Will be null if record was not found + # or an error occurred. See {@link BatchRecord#result_code}. + attr_reader :record + + # Result code for this returned record. See {@link com.aerospike.client.ResultCode}. + # If not {@link com.aerospike.client.ResultCode#OK}, the record will be null. + attr_accessor :result_code + + # Is it possible that the write transaction may have completed even though an error + # occurred for this record. This may be the case when a client error occurs (like timeout) + # after the command was sent to the server. + attr_accessor :in_doubt + + # Does this command contain a write operation. For internal use only. + attr_reader :has_write + + # Constructor. + def initialize(key, result_code: ResultCode::NO_RESPONSE, in_doubt: false, has_write: false) + @key = key + @record = record + @result_code = result_code + @in_doubt = in_doubt + @has_write = has_write + end + + def self.create_policy(policy, policy_klass, default_policy = nil) + case policy + when nil + default_policy || policy_klass.new + when policy_klass + policy + when Hash + policy_klass.new(policy) + else + raise TypeError, "policy should be a #{policy_klass.name} instance or a Hash" + end + end + + # Prepare for upcoming batch call. Reset result fields because this instance might be + # reused. For internal use only. + def prepare + @record = nil + @result_code = ResultCode::NO_RESPONSE + @in_doubt = false + end + + # Set record result. For internal use only. + def record=(record) + @record = record + @result_code = ResultCode::OK + end + + # Set error result. For internal use only. + def set_error(result_code, in_doubt) + @result_code = result_code + @in_doubt = in_doubt + end + + end +end diff --git a/lib/aerospike/batch_results.rb b/lib/aerospike/batch_results.rb new file mode 100644 index 00000000..ae71041d --- /dev/null +++ b/lib/aerospike/batch_results.rb @@ -0,0 +1,38 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +# Batch key and read only operations with default policy. +# Used in batch read commands where different bins are needed for each key. + +module Aerospike + + # Batch record results. + class BatchResults + + # Record results. + attr_accessor :records + + # Indicates if all records returned success. + attr_accessor :status + + # Constructor. + def intialize(records, status) + @records = records + @status = status + end + + end +end diff --git a/lib/aerospike/batch_udf.rb b/lib/aerospike/batch_udf.rb new file mode 100644 index 00000000..22bdddd0 --- /dev/null +++ b/lib/aerospike/batch_udf.rb @@ -0,0 +1,76 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +# Batch key and read only operations with default policy. +# Used in batch read commands where different bins are needed for each key. + +module Aerospike + + # Batch user defined functions. + class BatchUDF < BatchRecord + + # Optional UDF policy. + attr_accessor :policy + + # Package or lua module name. + attr_accessor :package_name + + # Lua function name. + attr_accessor :function_name + + # Optional arguments to lua function. + attr_accessor :function_args + + # Wire protocol bytes for function args. For internal use only. + attr_reader :arg_bytes + + # Constructor using default policy. + def initialize(key, package_name, function_name, function_args, opt = {}) + super(key, has_write: true) + @policy = BatchRecord.create_policy(opt, BatchUDFPolicy, DEFAULT_BATCH_UDF_POLICY) + @package_name = package_name + @function_name = function_name + @function_args = ListValue.new(function_args) + # Do not set arg_bytes here because may not be necessary if batch repeat flag is used. + end + + # Optimized reference equality check to determine batch wire protocol repeat flag. + # For internal use only. + def equals(other) + other && other.instance_of?(self.class) && + @function_name == other.function_name && @function_args == other.function_args && + @package_name == other.package_name && @policy == other.policy + end + + DEFAULT_BATCH_UDF_POLICY = BatchUDFPolicy.new + + # Return wire protocol size. For internal use only. + def size + size = 6 # gen(2) + exp(4) = 6 + + size += @policy&.filter_exp&.size if @policy&.filter_exp + + if @policy&.send_key + size += @key.user_key.estimate_size + Aerospike::FIELD_HEADER_SIZE + 1 + end + size += @package_name.bytesize + Aerospike::FIELD_HEADER_SIZE + size += @function_name.bytesize + Aerospike::FIELD_HEADER_SIZE + @arg_bytes = @function_args.to_bytes + size += @arg_bytes.bytesize + Aerospike::FIELD_HEADER_SIZE + size + end + end +end \ No newline at end of file diff --git a/lib/aerospike/batch_write.rb b/lib/aerospike/batch_write.rb new file mode 100644 index 00000000..74b5520b --- /dev/null +++ b/lib/aerospike/batch_write.rb @@ -0,0 +1,79 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +# Batch key and read only operations with default policy. +# Used in batch read commands where different bins are needed for each key. + +module Aerospike + + private + + # Batch key and read/write operations with write policy. + class BatchWrite < BatchRecord + # Optional write policy. + attr_accessor :policy + + # Required operations for this key. + attr_accessor :ops + + # Initialize batch key and read/write operations. + #

+ # {@link Operation#get()} is not allowed because it returns a variable number of bins and + # makes it difficult (sometimes impossible) to lineup operations with results. Instead, + # use {@link Operation#get(String)} for each bin name. + def initialize(key, ops, opt = {}) + super(key, has_write: true) + @policy = BatchRecord.create_policy(opt, BatchWritePolicy, DEFAULT_BATCH_WRITE_POLICY) + @ops = ops + end + + # Optimized reference equality check to determine batch wire protocol repeat flag. + # For internal use only. + def ==(other) + other && other.instance_of?(self.class) && + @ops == other.ops && @policy == other.policy && (@policy.nil? || !@policy.send_key) + end + + DEFAULT_BATCH_WRITE_POLICY = BatchWritePolicy.new + + # Return wire protocol size. For internal use only. + def size + size = 6 # gen(2) + exp(4) = 6 + + size += @policy&.filter_exp&.size if @policy&.filter_exp + + if @policy&.send_key + size += @key.user_key.estimate_size + Aerospike::FIELD_HEADER_SIZE + 1 + end + + has_write = false + @ops&.each do |op| + if op.is_write? + has_write = true + end + + size += op.bin_name.bytesize + Aerospike::OPERATION_HEADER_SIZE if op.bin_name + size += op.bin_value.estimate_size if op.bin_value + end + + unless has_write + raise AerospikeException.new(ResultCode::PARAMETER_ERROR, "Batch write operations do not contain a write") + end + + size + end + end +end \ No newline at end of file diff --git a/lib/aerospike/client.rb b/lib/aerospike/client.rb index d3a8fa1c..b1a58c8a 100644 --- a/lib/aerospike/client.rb +++ b/lib/aerospike/client.rb @@ -336,6 +336,21 @@ def batch_get_header(keys, options = nil) batch_get(keys, :none, options) end + # Operate on multiple records for specified batch keys in one batch call. + # This method allows different namespaces/bins for each key in the batch. + # The returned records are located in the same list. + # + # records can be BatchRead, BatchWrite, BatchDelete or BatchUDF. + # + # Requires server version 6.0+ + def batch_operate(records, options = nil) + policy = create_policy(options, BatchPolicy, default_batch_policy) + + execute_batch_operate_commands(policy, records) do |node, batch| + BatchOperateCommand.new(node, batch, policy, records) + end + end + # Check if multiple record keys exist in one batch call. # The returned boolean array is in positional order with the original key array order. # The policy can be used to specify timeouts and protocol type. @@ -975,5 +990,23 @@ def execute_batch_index_commands(policy, keys) threads.each(&:join) end + + def execute_batch_operate_commands(policy, records) + if @cluster.nodes.empty? + raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing Batch Index command failed because cluster is empty.") + end + + batch_nodes = BatchOperateNode.generate_list(@cluster, policy.replica, records) + threads = [] + + batch_nodes.each do |batch| + threads << Thread.new do + command = yield batch.node, batch + execute_command(command) + end + end + + threads.each(&:join) + end end # class end # module diff --git a/lib/aerospike/cluster.rb b/lib/aerospike/cluster.rb index 643d3d04..f5f5b4c4 100644 --- a/lib/aerospike/cluster.rb +++ b/lib/aerospike/cluster.rb @@ -120,15 +120,15 @@ def connected? # Returns a node on the cluster for read operations def batch_read_node(partition, replica_policy) case replica_policy - when Aerospike::Replica::MASTER, Aerospike::Replica::SEQUENCE - return master_node(partition) - when Aerospike::Replica::MASTER_PROLES - return master_proles_node(partition) - when Aerospike::Replica::PREFER_RACK - return rack_node(partition, seq) - when Aerospike::Replica::RANDOM - return random_node - else + when Aerospike::Replica::MASTER, Aerospike::Replica::SEQUENCE + master_node(partition) + when Aerospike::Replica::MASTER_PROLES + master_proles_node(partition) + when Aerospike::Replica::PREFER_RACK + rack_node(partition, seq) + when Aerospike::Replica::RANDOM + random_node + else raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value") end end @@ -136,17 +136,17 @@ def batch_read_node(partition, replica_policy) # Returns a node on the cluster for read operations def read_node(partition, replica_policy, seq) case replica_policy - when Aerospike::Replica::MASTER - return master_node(partition) - when Aerospike::Replica::MASTER_PROLES - return master_proles_node(partition) - when Aerospike::Replica::PREFER_RACK - return rack_node(partition, seq) - when Aerospike::Replica::SEQUENCE - return sequence_node(partition, seq) - when Aerospike::Replica::RANDOM - return random_node - else + when Aerospike::Replica::MASTER + master_node(partition) + when Aerospike::Replica::MASTER_PROLES + master_proles_node(partition) + when Aerospike::Replica::PREFER_RACK + rack_node(partition, seq) + when Aerospike::Replica::SEQUENCE + sequence_node(partition, seq) + when Aerospike::Replica::RANDOM + random_node + else raise Aerospike::Exceptions::InvalidNode("invalid policy.replica value") end end @@ -155,12 +155,12 @@ def read_node(partition, replica_policy, seq) def master_node(partition) partition_map = partitions replica_array = partition_map[partition.namespace] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array - node_array = (replica_array.get)[0] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !node_array + node_array = replica_array.get[0] + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array - node = (node_array.get)[partition.partition_id] + node = node_array.get[partition.partition_id] raise Aerospike::Exceptions::InvalidNode if !node || !node.active? node @@ -170,7 +170,7 @@ def master_node(partition) def rack_node(partition, seq) partition_map = partitions replica_array = partition_map[partition.namespace] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get @@ -179,10 +179,10 @@ def rack_node(partition, seq) node = nil fallback = nil for i in 1..replica_array.length - idx = (seq.update{|v| v.succ} % replica_array.size).abs - node = (replica_array[idx].get)[partition.partition_id] + idx = (seq.update { |v| v.succ } % replica_array.size).abs + node = replica_array[idx].get[partition.partition_id] - next if !node + next unless node fallback = node @@ -202,14 +202,14 @@ def rack_node(partition, seq) def master_proles_node(partition) partition_map = partitions replica_array = partition_map[partition.namespace] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get node = nil for replica in replica_array - idx = (@replica_index.update{|v| v.succ} % replica_array.size).abs - node = (replica_array[idx].get)[partition.partition_id] + idx = (@replica_index.update { |v| v.succ } % replica_array.size).abs + node = replica_array[idx].get[partition.partition_id] return node if node && node.active? end @@ -221,14 +221,14 @@ def master_proles_node(partition) def sequence_node(partition, seq) partition_map = partitions replica_array = partition_map[partition.namespace] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array replica_array = replica_array.get node = nil for replica in replica_array - idx = (seq.update{|v| v.succ} % replica_array.size).abs - node = (replica_array[idx].get)[partition.partition_id] + idx = (seq.update { |v| v.succ } % replica_array.size).abs + node = replica_array[idx].get[partition.partition_id] return node if node && node.active? end @@ -236,9 +236,13 @@ def sequence_node(partition, seq) raise Aerospike::Exceptions::InvalidNode end - def get_node_for_key(replica_policy, key) + def get_node_for_key(replica_policy, key, is_write: false) partition = Partition.new_by_key(key) - batch_read_node(partition, replica_policy) + if is_write + master_node(partition) + else + batch_read_node(partition, replica_policy) + end end # Returns partitions pertaining to a node @@ -247,10 +251,10 @@ def node_partitions(node, namespace) partition_map = partitions replica_array = partition_map[namespace] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !replica_array + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless replica_array - node_array = (replica_array.get)[0] - raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") if !node_array + node_array = replica_array.get[0] + raise Aerospike::Exceptions::InvalidNamespace("namespace not found in the partition map") unless node_array pid = 0 @@ -270,7 +274,7 @@ def random_node i = 0 while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. - idx = (@node_index.update{ |v| v.succ } % node_array.length).abs + idx = (@node_index.update { |v| v.succ } % node_array.length).abs node = node_array[idx] return node if node.active? @@ -366,13 +370,13 @@ def launch_tend_thread @tend_thread = Thread.new do Thread.current.abort_on_exception = false loop do - begin + tend sleep(@tend_interval / 1000.0) - rescue => e + rescue => e Aerospike.logger.error("Exception occured during tend: #{e}") Aerospike.logger.debug { e.backtrace.join("\n") } - end + end end end @@ -453,7 +457,7 @@ def refresh_nodes def log_tend_stats(nodes) diff = nodes.size - @old_node_count - action = "#{diff.abs} #{diff.abs == 1 ? "node has" : "nodes have"} #{diff > 0 ? "joined" : "left"} the cluster." + action = "#{diff.abs} #{diff.abs == 1 ? 'node has' : 'nodes have'} #{diff > 0 ? 'joined' : 'left'} the cluster." Aerospike.logger.info("Tend finished. #{action} Old node count: #{@old_node_count}, New node count: #{nodes.size}") @old_node_count = nodes.size end @@ -689,11 +693,11 @@ def remove_nodes_copy(nodes_to_remove) end def node_exists(search, node_list) - node_list.any? {|node| node == search } + node_list.any? { |node| node == search } end def find_node_by_name(node_name) - nodes.detect{|node| node.name == node_name } + nodes.detect { |node| node.name == node_name } end end end diff --git a/lib/aerospike/command/batch_index_node.rb b/lib/aerospike/command/batch_index_node.rb index 8cb93d28..33514076 100644 --- a/lib/aerospike/command/batch_index_node.rb +++ b/lib/aerospike/command/batch_index_node.rb @@ -19,13 +19,12 @@ module Aerospike class BatchIndexNode #:nodoc: - attr_accessor :node - attr_accessor :keys_by_idx + attr_accessor :node, :keys_by_idx def self.generate_list(cluster, replica_policy, keys) keys.each_with_index - .group_by { |key, _| cluster.get_node_for_key(replica_policy, key) } - .map { |node, keys_with_idx| BatchIndexNode.new(node, keys_with_idx) } + .group_by { |key, _| cluster.get_node_for_key(replica_policy, key) } + .map { |node, keys_with_idx| BatchIndexNode.new(node, keys_with_idx) } end def initialize(node, keys_with_idx) diff --git a/lib/aerospike/command/batch_operate_command.rb b/lib/aerospike/command/batch_operate_command.rb new file mode 100644 index 00000000..db816e83 --- /dev/null +++ b/lib/aerospike/command/batch_operate_command.rb @@ -0,0 +1,151 @@ +# Copyright 2018 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +require 'aerospike/command/multi_command' + +module Aerospike + + class BatchOperateCommand < MultiCommand #:nodoc: + + attr_accessor :batch, :policy, :attr, :records + + def initialize(node, batch, policy, records) + super(node) + @batch = batch + @policy = policy + @records = records + end + + def batch_flags + flags = 0 + # flags |= 0x1 if @policy.allow_inline + flags |= 0x2 if @policy.allow_inline_ssd + flags |= 0x4 if @policy.respond_all_keys + flags + end + + def write_buffer + field_count = 1 + + exp_size = estimate_expression_size(@policy.filter_exp) + @data_offset += exp_size + field_count += 1 if exp_size > 0 + + @data_buffer.reset + begin_cmd + @data_offset += FIELD_HEADER_SIZE + 4 + 1 # batch.keys.length + flags + + prev = nil + @records.each do |record| + key = record.key + @data_offset += key.digest.length + 4 # 4 byte batch offset + + if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev + @data_offset += 1 + else + @data_offset += 12 + @data_offset += key.namespace.bytesize + FIELD_HEADER_SIZE + @data_offset += key.set_name.bytesize + FIELD_HEADER_SIZE + @data_offset += record.size + end + + prev = record + end + size_buffer + write_batch_header(policy, field_count) + + write_filter_exp(@policy.filter_exp, exp_size) + + field_size_offset = @data_offset + + write_field_header(0, Aerospike::FieldType::BATCH_INDEX) + @data_offset += @data_buffer.write_int32(batch.records.length, @data_offset) + @data_offset += @data_buffer.write_byte(batch_flags, @data_offset) + + prev = nil + attr = BatchAttr.new + batch.records.each_with_index do |record, index| + @data_offset += @data_buffer.write_int32(index, @data_offset) + key = record.key + @data_offset += @data_buffer.write_binary(key.digest, @data_offset) + + if !@policy.send_key && !prev.nil? && prev.key.namespace == key.namespace && prev.key.set_name == key.set_name && record == prev + @data_offset += @data_buffer.write_byte(BATCH_MSG_REPEAT, @data_offset) + else + case record + when BatchRead + attr.set_batch_read(record.policy) + if record.bin_names&.length&.> 0 + write_batch_bin_names(key, record.bin_names, attr, attr.filter_exp) + elsif record.ops&.length&.> 0 + attr.adjust_read(br.ops) + write_batch_operations(key, record.ops, attr, attr.filter_exp) + else + attr.adjust_read_all_bins(record.read_all_bins) + write_batch_read(key, attr, attr.filter_exp, 0) + end + + when BatchWrite + attr.set_batch_write(record.policy) + attr.adjust_write(record.ops) + write_batch_operations(key, record.ops, attr, attr.filter_exp) + + when BatchUDF + attr.set_batch_udf(record.policy) + write_batch_write(key, attr, attr.filter_exp, 3, 0) + write_field_string(record.package_name, Aerospike::FieldType::UDF_PACKAGE_NAME) + write_field_string(record.function_name, Aerospike::FieldType::UDF_FUNCTION) + write_field_bytes(record.arg_bytes, Aerospike::FieldType::UDF_ARGLIST) + + when BatchDelete + attr.set_batch_delete(record.policy) + write_batch_write(key, attr, attr.filter_exp, 0, 0) + end + + prev = record + end + end + + @data_buffer.write_uint32(@data_offset-MSG_TOTAL_HEADER_SIZE-4, field_size_offset) + + end_cmd + mark_compressed(@policy) + end + + # Parse all results in the batch. Add records to shared list. + # If the record was not found, the bins will be nil. + def parse_row(result_code) + generation = @data_buffer.read_int32(6) + expiration = @data_buffer.read_int32(10) + batch_index = @data_buffer.read_int32(14) + field_count = @data_buffer.read_int16(18) + op_count = @data_buffer.read_int16(20) + + skip_key(field_count) + req_key = records[batch_index].key + + records[batch_index].result_code = result_code + case result_code + when 0, ResultCode::UDF_BAD_RESPONSE + record = parse_record(req_key, op_count, generation, expiration) + records[batch_index].record = record + end + end + + end # class + +end # module diff --git a/lib/aerospike/command/batch_operate_node.rb b/lib/aerospike/command/batch_operate_node.rb new file mode 100644 index 00000000..1905ed6c --- /dev/null +++ b/lib/aerospike/command/batch_operate_node.rb @@ -0,0 +1,51 @@ +# Copyright 2018 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + class BatchOperateNode #:nodoc: + + attr_accessor :node, :records_by_idx + + def self.generate_list(cluster, replica_policy, records) + records.each_with_index + .group_by { |record, _| cluster.get_node_for_key(replica_policy, record.key, is_write: record.has_write) } + .map { |node, records_with_idx| BatchOperateNode.new(node, records_with_idx) } + end + + def initialize(node, records_with_idx) + @node = node + @records_by_idx = records_with_idx.map(&:reverse).to_h + end + + def records + records_by_idx.values + end + + def each_record_with_index + records_by_idx.each do |idx, rec| + yield rec, idx + end + end + + def record_for_index(idx) + @records_by_idx[idx] + end + + end + +end diff --git a/lib/aerospike/command/command.rb b/lib/aerospike/command/command.rb index 05807a3d..03da4bcb 100644 --- a/lib/aerospike/command/command.rb +++ b/lib/aerospike/command/command.rb @@ -76,6 +76,12 @@ module Aerospike # Completely replace existing record only. INFO3_REPLACE_ONLY = Integer(1 << 5) + BATCH_MSG_READ = 0x0 + BATCH_MSG_REPEAT = 0x1 + BATCH_MSG_INFO = 0x2 + BATCH_MSG_GEN = 0x4 + BATCH_MSG_TTL = 0x8 + MSG_TOTAL_HEADER_SIZE = 30 FIELD_HEADER_SIZE = 5 OPERATION_HEADER_SIZE = 8 @@ -958,7 +964,7 @@ def write_header_read(policy, read_attr, info_attr, field_count, operation_count @data_buffer.write_byte(0, 10) @data_buffer.write_byte(info_attr, 11) - (12...22).each { |i| @data_buffer.write_byte(i, 0) } + (12...22).each { |i| @data_buffer.write_byte(0, i) } # Initialize timeout. It will be written later. @data_buffer.write_byte(0, 22) @@ -981,7 +987,7 @@ def write_header_read_header(policy, read_attr, field_count, operation_count) @data_buffer.write_byte(0, 10) @data_buffer.write_byte(info_attr, 11) - (12...22).each { |i| @data_buffer.write_byte(i, 0) } + (12...22).each { |i| @data_buffer.write_byte(0, i) } # Initialize timeout. It will be written later. @data_buffer.write_byte(0, 22) @@ -995,6 +1001,89 @@ def write_header_read_header(policy, read_attr, field_count, operation_count) @data_offset = MSG_TOTAL_HEADER_SIZE end + def write_batch_operations(key, ops, attr, filter_exp) + if attr.has_write + write_batch_write(key, attr, filter_exp, 0, ops.length) + else + write_batch_read(key, attr, filter_exp, ops.length) + end + + ops.each do |op| + write_operation_for_operation(op) + end + end + + def write_batch_fields(key, field_count, op_count) + field_count+=2 + @data_offset += @data_buffer.write_uint16(field_count, @data_offset) + @data_offset += @data_buffer.write_uint16(op_count, @data_offset) + write_field_string(key.namespace, Aerospike::FieldType::NAMESPACE) + write_field_string(key.set_name, Aerospike::FieldType::TABLE) + end + + def write_batch_fields_with_filter(key, filter_exp, field_count, op_count) + if filter_exp + field_count+=1 + write_batch_fields(key, field_count, op_count) + write_filter_exp(filter_exp, filter_exp.size) + else + write_batch_fields(key, field_count, op_count) + end + end + + def write_batch_read(key, attr, filter_exp, op_count) + @data_offset += @data_buffer.write_byte(BATCH_MSG_INFO | BATCH_MSG_TTL, @data_offset) + @data_offset += @data_buffer.write_byte(attr.read_attr, @data_offset) + @data_offset += @data_buffer.write_byte(attr.write_attr, @data_offset) + @data_offset += @data_buffer.write_byte(attr.info_attr, @data_offset) + @data_offset += @data_buffer.write_uint32(attr.expiration, @data_offset) + write_batch_fields_with_filter(key, filter_exp, 0, op_count) + end + + def write_batch_write(key, attr, filter_exp, field_count, op_count) + @data_offset += @data_buffer.write_byte(BATCH_MSG_INFO | BATCH_MSG_GEN | BATCH_MSG_TTL, @data_offset) + @data_offset += @data_buffer.write_byte(attr.read_attr, @data_offset) + @data_offset += @data_buffer.write_byte(attr.write_attr, @data_offset) + @data_offset += @data_buffer.write_byte(attr.info_attr, @data_offset) + @data_offset += @data_buffer.write_uint16(attr.generation, @data_offset) + @data_offset += @data_buffer.write_uint32(attr.expiration, @data_offset) + if attr.send_key + field_count+=1 + write_batch_fields_with_filter(key, filter_exp, field_count, op_count) + write_field_value(key.user_key, KEY) + else + write_batch_fields_with_filter(key, filter_exp, field_count, op_count) + end + end + + def write_batch_bin_names(key, bin_names, attr, filter_exp) + write_batch_read(key, attr, filter_exp, bin_names.length) + bin_names.each do |bin_name| + write_operation_for_bin_name(bin_name, Aerospike::Operation::READ) + end + end + + def write_batch_header(policy, field_count) + read_attr = INFO1_BATCH + read_attr |= INFO1_COMPRESS_RESPONSE if policy.use_compression + #TODO: Add SC Mode + + @data_buffer.write_byte(MSG_REMAINING_HEADER_SIZE, 8) # Message header.length, @data_offset. + @data_buffer.write_byte(read_attr, 9) + @data_buffer.write_byte(0, 10) + @data_buffer.write_byte(0, 11) + + (12...22).each { |i| @data_buffer.write_byte(0, i) } + + # Initialize timeout. It will be written later. + @data_buffer.write_uint32(0, 22) + + @data_buffer.write_uint16(field_count, 26) + @data_buffer.write_uint16(0, 28) + + @data_offset = MSG_TOTAL_HEADER_SIZE + end + def write_key(key, policy = nil) # Write key into buffer. if key.namespace diff --git a/lib/aerospike/node.rb b/lib/aerospike/node.rb index f95b9bb1..b2799e7b 100644 --- a/lib/aerospike/node.rb +++ b/lib/aerospike/node.rb @@ -71,6 +71,10 @@ def query_show? (@features & HAS_QUERY_SHOW) != 0 end + def batch_any? + (@features & HAS_BATCH_ANY) != 0 + end + def update_racks(parser) new_racks = parser.update_racks @racks.value = new_racks if new_racks @@ -78,7 +82,7 @@ def update_racks(parser) def has_rack(ns, rack_id) racks = @racks.value - return false if !racks + return false unless racks racks[ns] == rack_id end @@ -108,7 +112,7 @@ def get_connection(timeout) # Put back a connection to the cache. If cache is full, the connection will be # closed and discarded def put_connection(conn) - conn.close if !active? + conn.close unless active? @connections.offer(conn) end @@ -236,7 +240,7 @@ def refresh_partitions(peers) Node::Refresh::Partitions.(self, peers) end - def refresh_racks() + def refresh_racks Node::Refresh::Racks.(self) end diff --git a/lib/aerospike/operation.rb b/lib/aerospike/operation.rb index 974110c6..b72fe388 100644 --- a/lib/aerospike/operation.rb +++ b/lib/aerospike/operation.rb @@ -80,5 +80,42 @@ def self.touch def self.delete Operation.new(DELETE) end + + def is_write? + case @op_type + when READ + false + when READ_HEADER + false + when WRITE + true + when CDT_READ + false + when CDT_MODIFY + true + when ADD + true + when EXP_READ + false + when EXP_MODIFY + true + when APPEND + true + when PREPEND + true + when TOUCH + true + when BIT_READ + false + when BIT_MODIFY + true + when DELETE + true + when HLL_READ + false + when HLL_MODIFY + true + end + end end end # module diff --git a/lib/aerospike/policy/batch_delete_policy.rb b/lib/aerospike/policy/batch_delete_policy.rb new file mode 100644 index 00000000..8372cdee --- /dev/null +++ b/lib/aerospike/policy/batch_delete_policy.rb @@ -0,0 +1,71 @@ +# encoding: utf-8 +# Copyright 2014-2024 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License") you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + # Policy attributes used in batch delete commands. + class BatchDeletePolicy + attr_accessor :filter_exp, :commit_level, :generation_policy, :generation, :durable_delete, :send_key + + def initialize(opt = {}) + # Optional expression filter. If filterExp exists and evaluates to false, the specific batch key + # request is not performed and {@link com.aerospike.client.BatchRecord#result_code} is set to + # {@link com.aerospike.client.ResultCode#FILTERED_OUT}. + # + # If exists, this filter overrides the batch parent filter {@link com.aerospike.client.policy.Policy#filter_exp} + # for the specific key in batch commands that allow a different policy per key. + # Otherwise, this filter is ignored. + # + # Default: nil + @filter_exp = opt[:filter_exp] + + # Desired consistency guarantee when committing a transaction on the server. The default + # (COMMIT_ALL) indicates that the server should wait for master and all replica commits to + # be successful before returning success to the client. + # + # Default: CommitLevel.COMMIT_ALL + @commit_level = opt[:commit_level] || CommitLevel::COMMIT_ALL + + # Qualify how to handle record deletes based on record generation. The default (NONE) + # indicates that the generation is not used to restrict deletes. + # + # Default: GenerationPolicy.NONE + @generation_policy = opt[:generation_policy] || GenerationPolicy::NONE + + # Expected generation. Generation is the number of times a record has been modified + # (including creation) on the server. This field is only relevant when generationPolicy + # is not NONE. + # + # Default: 0 + @generation = opt[:generation] || 0 + + # If the transaction results in a record deletion, leave a tombstone for the record. + # This prevents deleted records from reappearing after node failures. + # Valid for Aerospike Server Enterprise Edition only. + # + # Default: false (do not tombstone deleted records). + @durable_delete = opt[:durable_delete] || false + + # Send user defined key in addition to hash digest. + # If true, the key will be stored with the tombstone record on the server. + # + # Default: false (do not send the user defined key) + @send_key = opt[:send_key] || false + + self + end + end +end \ No newline at end of file diff --git a/lib/aerospike/policy/batch_policy.rb b/lib/aerospike/policy/batch_policy.rb index c54247ab..b9233413 100644 --- a/lib/aerospike/policy/batch_policy.rb +++ b/lib/aerospike/policy/batch_policy.rb @@ -21,9 +21,10 @@ module Aerospike # Container object for batch policy command. class BatchPolicy < Policy + attr_accessor :allow_inline_ssd, :respond_all_keys, :send_key def initialize(opt={}) - super(opt) + super # [:nodoc:] # DEPRECATED @@ -39,11 +40,58 @@ def initialize(opt={}) # index protocol will perform this record proxy when necessary. # # Default: false (use new batch index protocol if server supports it) - @use_batch_direct = opt.fetch(:use_batch_direct) { false } - + @use_batch_direct = opt.fetch(:use_batch_direct, false) + + + # Allow batch to be processed immediately in the server's receiving thread for SSD + # namespaces. If false, the batch will always be processed in separate service threads. + # Server versions < 6.0 ignore this field. + # + # Inline processing can introduce the possibility of unfairness because the server + # can process the entire batch before moving onto the next command. + # + # Default: false + @allow_inline_ssd = opt.fetch(:allow_inline_ssd, false) + + + # Should all batch keys be attempted regardless of errors. This field is used on both + # the client and server. The client handles node specific errors and the server handles + # key specific errors. + # + # If true, every batch key is attempted regardless of previous key specific errors. + # Node specific errors such as timeouts stop keys to that node, but keys directed at + # other nodes will continue to be processed. + # + # If false, the server will stop the batch to its node on most key specific errors. + # The exceptions are {@link com.aerospike.client.ResultCode#KEY_NOT_FOUND_ERROR} and + # {@link com.aerospike.client.ResultCode#FILTERED_OUT} which never stop the batch. + # The client will stop the entire batch on node specific errors. The client will + # not stop the entire batch commands run in parallel. + # + # Server versions < 6.0 do not support this field and treat this value as false + # for key specific errors. + # + # Default: true + @respond_all_keys = opt.fetch(:respond_all_keys, true) + + + # Send user defined key in addition to hash digest on a record put. + # The default is to _not_ send the user defined key. + @send_key = opt.fetch(:send_key, false) + self end + def self.read_default + BatchPolicy.new + end + + def self.write_default + bp = BatchPolicy.new + bp.max_retries = 0 + bp + end + end # class end # module diff --git a/lib/aerospike/policy/batch_read_policy.rb b/lib/aerospike/policy/batch_read_policy.rb new file mode 100644 index 00000000..309534b3 --- /dev/null +++ b/lib/aerospike/policy/batch_read_policy.rb @@ -0,0 +1,38 @@ +# Copyright 2014-2020 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + # Policy attributes used in batch read commands. + class BatchReadPolicy + + attr_accessor :filter_exp + + def initialize(opt={}) + # Optional expression filter. If filterExp exists and evaluates to false, the specific batch key + # request is not performed and {@link com.aerospike.client.BatchRecord#result_code} is set to + # {@link com.aerospike.client.ResultCode#FILTERED_OUT}. + # + # If exists, this filter overrides the batch parent filter {@link com.aerospike.client.policy.Policy#filter_exp} + # for the specific key in batch commands that allow a different policy per key. + # Otherwise, this filter is ignored. + # + # Default: nil + @filter_exp = opt[:filter_exp] + end + end +end \ No newline at end of file diff --git a/lib/aerospike/policy/batch_udf_policy.rb b/lib/aerospike/policy/batch_udf_policy.rb new file mode 100644 index 00000000..f1f014b2 --- /dev/null +++ b/lib/aerospike/policy/batch_udf_policy.rb @@ -0,0 +1,75 @@ +# Copyright 2014-2023 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + # Policy attributes used in batch UDF execute commands. + class BatchUDFPolicy + + attr_accessor :filter_exp, :commit_level, :ttl, :durable_delete, :send_key + + alias expiration ttl + alias expiration= ttl= + + def initialize(opt={}) + # Optional expression filter. If filterExp exists and evaluates to false, the specific batch key + # request is not performed and {@link com.aerospike.client.BatchRecord#resultCode} is set to + # {@link com.aerospike.client.ResultCode#FILTERED_OUT}. + # + # If exists, this filter overrides the batch parent filter {@link com.aerospike.client.policy.Policy#filterExp} + # for the specific key in batch commands that allow a different policy per key. + # Otherwise, this filter is ignored. + # + # Default: nil + @filter_exp = opt[:filter_exp] + + # Desired consistency guarantee when committing a transaction on the server. The default + # (COMMIT_ALL) indicates that the server should wait for master and all replica commits to + # be successful before returning success to the client. + # + # Default: CommitLevel::COMMIT_ALL + @commit_level = opt.fetch(:commit_level, CommitLevel::COMMIT_ALL) + + # Record expiration; also known as time-to-live (TTL). + # Seconds record will live before being removed by the server. + # + # Supported values: + # - `Aerospike::TTL::NEVER_EXPIRE`: Never expire record; requires Aerospike 2 + # server versions >= 2.7.2 or Aerospike 3 server versions >= 3.1.4. Do + # not use for older servers. + # - `Aerospike::TTL::NAMESPACE_DEFAULT`: Default to namespace configuration + # variable "default-ttl" on the server. + # - `Aerospike::TTL::DONT_UPDATE`: Do not change a record's expiration date + # when updating the record. Requires Aerospike server v3.10.1 or later. + # - Any value > 0: Actual time-to-live in seconds. + @ttl = opt[:ttl] || opt[:expiration] || 0 + + # If the transaction results in a record deletion, leave a tombstone for the record. + # This prevents deleted records from reappearing after node failures. + # Valid for Aerospike Server Enterprise Edition only. + # + # Default: false (do not tombstone deleted records). + @durable_delete = opt.fetch(:durable_delete, false) + + # Send user defined key in addition to hash digest. + # If true, the key will be stored with the record on the server. + # + # Default: false (do not send the user defined key) + @send_key = opt.fetch(:send_key, false) + end + end +end \ No newline at end of file diff --git a/lib/aerospike/policy/batch_write_policy.rb b/lib/aerospike/policy/batch_write_policy.rb new file mode 100644 index 00000000..c17e56c5 --- /dev/null +++ b/lib/aerospike/policy/batch_write_policy.rb @@ -0,0 +1,105 @@ +# Copyright 2014-2023 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +module Aerospike + + + # Policy attributes used in batch write commands. + class BatchWritePolicy + + attr_accessor :filter_exp, :record_exists_action, :commit_level, + :generation_policy, :generation, :ttl, :durable_delete, + :send_key + + alias expiration ttl + alias expiration= ttl= + + def initialize(opt={}) + # Optional expression filter. If filterExp exists and evaluates to false, the specific batch key + # request is not performed and {@link com.aerospike.client.BatchRecord#resultCode} is set to + # {@link com.aerospike.client.ResultCode#FILTERED_OUT}. + # + # If exists, this filter overrides the batch parent filter {@link com.aerospike.client.policy.Policy#filter_exp} + # for the specific key in batch commands that allow a different policy per key. + # Otherwise, this filter is ignored. + # + # Default: nil + @filter_exp = opt[:filter_exp] + + # Qualify how to handle writes where the record already exists. + # + # Default: RecordExistsAction::UPDATE + @record_exists_action = opt.fetch(:record_exists_action, RecordExistsAction::UPDATE) + + # Desired consistency guarantee when committing a transaction on the server. The default + # (COMMIT_ALL) indicates that the server should wait for master and all replica commits to + # be successful before returning success to the client. + # + # Default: CommitLevel::COMMIT_ALL + @commit_level = opt.fetch(:commit_level, CommitLevel::COMMIT_ALL) + + # Qualify how to handle record writes based on record generation. The default (NONE) + # indicates that the generation is not used to restrict writes. + # + # The server does not support this field for UDF execute() calls. The read-modify-write + # usage model can still be enforced inside the UDF code itself. + # + # Default: GenerationPolicy::NONE + @generation_policy = opt.fetch(:generation_policy, GenerationPolicy::NONE) + + # Expected generation. Generation is the number of times a record has been modified + # (including creation) on the server. If a write operation is creating a record, + # the expected generation would be 0. This field is only relevant when + # generationPolicy is not NONE. + # + # The server does not support this field for UDF execute() calls. The read-modify-write + # usage model can still be enforced inside the UDF code itself. + # + # Default: 0 + @generation = opt.fetch(:generation, 0) + + # Record expiration; also known as time-to-live (TTL). + # Seconds record will live before being removed by the server. + # + # Supported values: + # - `Aerospike::TTL::NEVER_EXPIRE`: Never expire record; requires Aerospike 2 + # server versions >= 2.7.2 or Aerospike 3 server versions >= 3.1.4. Do + # not use for older servers. + # - `Aerospike::TTL::NAMESPACE_DEFAULT`: Default to namespace configuration + # variable "default-ttl" on the server. + # - `Aerospike::TTL::DONT_UPDATE`: Do not change a record's expiration date + # when updating the record. Requires Aerospike server v3.10.1 or later. + # - Any value > 0: Actual time-to-live in seconds. + @ttl = opt[:ttl] || opt[:expiration] || 0 + + # If the transaction results in a record deletion, leave a tombstone for the record. + # This prevents deleted records from reappearing after node failures. + # Valid for Aerospike Server Enterprise Edition only. + # + # Default: false (do not tombstone deleted records). + @durable_delete = opt.fetch(:durable_delete, false) + + # Send user defined key in addition to hash digest. + # If true, the key will be stored with the record on the server. + # + # Default: false (do not send the user defined key) + @send_key = opt.fetch(:send_key, false) + + self + end + end +end \ No newline at end of file diff --git a/lib/aerospike/utils/buffer.rb b/lib/aerospike/utils/buffer.rb index d14c757b..0ed7d62f 100644 --- a/lib/aerospike/utils/buffer.rb +++ b/lib/aerospike/utils/buffer.rb @@ -25,7 +25,7 @@ module Aerospike # Buffer class to ease the work around class Buffer #:nodoc: @@buf_pool = Pool.new - @@buf_pool.create_proc = Proc.new { Buffer.new } + @@buf_pool.create_proc = proc { Buffer.new } attr_accessor :buf @@ -43,7 +43,7 @@ class Buffer #:nodoc: MAX_BUFFER_SIZE = 10 * 1024 * 1024 def initialize(size = DEFAULT_BUFFER_SIZE, buf = nil) - @buf = (buf ? buf : ("%0#{size}d" % 0)) + @buf = buf || format("%0#{size}d", 0) @buf.force_encoding("binary") @slice_end = @buf.bytesize end @@ -60,7 +60,7 @@ def size @buf.bytesize end - alias_method :length, :size + alias length size def eat!(n) @buf.replace(@buf[n..-1]) @@ -74,7 +74,7 @@ def resize(length) end if @buf.bytesize < length - @buf.concat("%0#{length - @buf.bytesize}d" % 0) + @buf.concat(format("%0#{length - @buf.bytesize}d", 0)) end @slice_end = length end @@ -144,37 +144,37 @@ def read(offset, len = nil) def read_int16(offset) vals = @buf[offset..offset + 1] - vals.unpack(INT16)[0] + vals.unpack1(INT16) end def read_uint16(offset) vals = @buf[offset..offset + 1] - vals.unpack(UINT16)[0] + vals.unpack1(UINT16) end def read_int32(offset) vals = @buf[offset..offset + 3] - vals.unpack(INT32)[0] + vals.unpack1(INT32) end def read_uint32(offset) vals = @buf[offset..offset + 3] - vals.unpack(UINT32)[0] + vals.unpack1(UINT32) end def read_int64(offset) vals = @buf[offset..offset + 7] - vals.unpack(INT64)[0] + vals.unpack1(INT64) end def read_uint64_little_endian(offset) vals = @buf[offset..offset + 7] - vals.unpack(UINT64LE)[0] + vals.unpack1(UINT64LE) end def read_uint64(offset) vals = @buf[offset..offset + 7] - vals.unpack(UINT64)[0] + vals.unpack1(UINT64) end def read_var_int64(offset, len) @@ -190,7 +190,7 @@ def read_var_int64(offset, len) def read_double(offset) vals = @buf[offset..offset + 7] - vals.unpack(DOUBLE)[0] + vals.unpack1(DOUBLE) end def read_bool(offset, length) @@ -219,13 +219,13 @@ def dump(start = 0, finish = nil) @buf.bytes[start...finish].each do |c| if counter >= start print "%02x " % c - ascii << (c.between?(32, 126) ? c : ?.) - print " " if ascii.length == (width / 2 + 1) + ascii << (c.between?(32, 126) ? c : '.') + print " " if ascii.length == ((width / 2) + 1) if ascii.length > width ascii << "|" puts ascii ascii = "|" - print "%08x " % (counter + 1) + print format("%08x ", (counter + 1)) end end counter += 1 diff --git a/spec/aerospike/batch_operate_spec.rb b/spec/aerospike/batch_operate_spec.rb new file mode 100644 index 00000000..a165f8e0 --- /dev/null +++ b/spec/aerospike/batch_operate_spec.rb @@ -0,0 +1,185 @@ +# encoding: utf-8 +# Copyright 2014 Aerospike, Inc. +# +# Portions may be licensed to Aerospike, Inc. under one or more contributor +# license agreements. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +require 'aerospike' +require 'aerospike/batch_read' + +describe Aerospike::Client do + + let(:client) { Support.client } + + describe "#batch_operate" do + let(:batch_policy) do + Aerospike::BatchPolicy.new + end + + let(:existing_keys) { Array.new(3) { Support.gen_random_key } } + # let(:existing_keys) { [Aerospike::Key.new("test", "test", 1)] } + let(:keys) { existing_keys } + let(:no_such_key) { Support.gen_random_key } + let(:keys) { existing_keys } + let(:opts) { { filter_exp: Aerospike::Exp.eq(Aerospike::Exp.int_bin("strval"), Aerospike::Exp.int_val(0)) } } + + before do + existing_keys.each_with_index do |key, idx| + client.put(key, { + 'idx' => idx, + 'key' => key.user_key, + 'rnd' => 99 #rand + }, {}) + end + end + + context '#BatchRead' do + it 'returns specified bins' do + bin_names = %w[idx rnd] + records = [Aerospike::BatchRead.read_bins(keys.first, bin_names)] + client.batch_operate(records, batch_policy) + + expect(records[0].result_code).to eql 0 + expect(records[0].record.bins.length).to eql 2 + end + + it 'returns all records' do + records = [Aerospike::BatchRead.read_all_bins(keys.first)] + client.batch_operate(records, batch_policy) + + expect(records[0].result_code).to eql 0 + expect(records[0].record.bins.length).to eql 3 + end + + it 'filter out' do + records = [Aerospike::BatchRead.read_all_bins(keys.first)] + client.batch_operate(records, opts) + + expect(records[0].result_code).to eql Aerospike::ResultCode::FILTERED_OUT + expect(records[0].record).to eql nil + end + end + + context '#BatchWrite' do + it 'updates specified bins' do + ops = [ + Aerospike::Operation.put(Aerospike::Bin.new("new_bin_str", "value")), + Aerospike::Operation.put(Aerospike::Bin.new("new_bin_int", 999)), + Aerospike::Operation.add(Aerospike::Bin.new("new_bin_int", 1)) + ] + records = [Aerospike::BatchWrite.new(keys.first, ops)] + client.batch_operate(records, batch_policy) + + expect(records[0].result_code).to eql 0 + expect(records[0].record.bins).to eql({ "new_bin_int"=>nil, "new_bin_str"=>nil }) + + records = [Aerospike::BatchRead.read_all_bins(keys.first)] + client.batch_operate(records, batch_policy) + + expect(records[0].record.bins).to eql({ + 'idx' => 0, + 'key' => keys.first.user_key, + 'rnd' => 99, #rand + "new_bin_str" => "value", + "new_bin_int" => 1000 + }) + end + + it 'filter out' do + ops = [ + Aerospike::Operation.put(Aerospike::Bin.new("new_bin_str", "value")), + Aerospike::Operation.put(Aerospike::Bin.new("new_bin_int", 999)), + Aerospike::Operation.add(Aerospike::Bin.new("new_bin_int", 1)) + ] + records = [Aerospike::BatchWrite.new(keys.first, ops)] + client.batch_operate(records, opts) + + expect(records[0].result_code).to eql Aerospike::ResultCode::FILTERED_OUT + expect(records[0].record).to eql nil + end + + it 'removes specific records' do + ops = [ + Aerospike::Operation.delete + ] + records = [Aerospike::BatchWrite.new(keys.first, ops)] + client.batch_operate(records, batch_policy) + + exists = client.exists(keys.first) + expect(exists).to eql false + + end + end + + context '#BatchDelete' do + it 'removes specific records' do + ops = [ + Aerospike::Operation.delete + ] + records = [Aerospike::BatchDelete.new(keys.first)] + client.batch_operate(records, batch_policy) + + expect(records[0].result_code).to eql 0 + expect(records[0].record.bins).to eql nil + + exists = client.exists(keys.first) + expect(exists).to eql false + end + + it 'filter out' do + ops = [ + Aerospike::Operation.delete + ] + records = [Aerospike::BatchDelete.new(keys.first)] + client.batch_operate(records, opts) + + expect(records[0].result_code).to eql Aerospike::ResultCode::FILTERED_OUT + expect(records[0].record).to eql nil + end + + end # context + + context '#BatchUDF' do + let(:udf_body_string) do + "function testStr(rec, str) + return str -- Return the Return value and/or status + end" + end + + before do + register_task = client.register_udf(udf_body_string, "test-udf-batch.lua", Aerospike::Language::LUA) + expect(register_task.wait_till_completed).to be true + expect(register_task.completed?).to be true + end + + it 'calls specific UDF' do + records = [Aerospike::BatchUDF.new(keys.first, "test-udf-batch", "testStr", ["ping_str"])] + client.batch_operate(records, batch_policy) + + expect(records[0].result_code).to eql 0 + expect(records[0].record.bins).to eql({ "SUCCESS"=>"ping_str" }) + end + + it 'filter out' do + records = [Aerospike::BatchUDF.new(keys.first, "test-udf-batch", "testStr", ["ping_str"])] + client.batch_operate(records, opts) + + expect(records[0].result_code).to eql Aerospike::ResultCode::FILTERED_OUT + expect(records[0].record).to eql nil + end + + end # context + + end # describe + +end # describe