From 7f9d758a56acea6efdc82774ffe1cd08e36b5af6 Mon Sep 17 00:00:00 2001 From: Mike Moore Date: Fri, 25 May 2018 16:24:20 -0600 Subject: [PATCH] Add Subscriber Async Unary Pusher Instead of sending Ack/ModAck messages on the stream, send normal RCP messages instead. This relieves some GRPC stream congestion. Modifies the Async Stream Pusher batch to make RPC calls instead. --- ...async_pusher.rb => async_stream_pusher.rb} | 13 +- .../pubsub/subscriber/async_unary_pusher.rb | 250 ++++++++++++++++++ .../google/cloud/pubsub/subscriber/stream.rb | 19 +- 3 files changed, 260 insertions(+), 22 deletions(-) rename google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/{async_pusher.rb => async_stream_pusher.rb} (95%) create mode 100644 google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_pusher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb similarity index 95% rename from google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_pusher.rb rename to google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb index f689ac677c94..fdba88fc4ba2 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_pusher.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_stream_pusher.rb @@ -22,9 +22,9 @@ module Pubsub class Subscriber ## # @private - # # AsyncPusher + # # AsyncStreamPusher # - class AsyncPusher + class AsyncStreamPusher include MonitorMixin attr_reader :batch @@ -108,9 +108,7 @@ def stop @background_thread.kill if @background_thread end - return nil if @batch.nil? - - @batch.request + push_batch_request! end def started? @@ -147,10 +145,7 @@ def run_background def push_batch_request! return unless @batch - request = @batch.request - Concurrent::Future.new(executor: @stream.push_thread_pool) do - @stream.push request - end.execute + @stream.push @batch.request @batch = nil @batch_created_at = nil diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb new file mode 100644 index 000000000000..c2b7c09f4243 --- /dev/null +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/async_unary_pusher.rb @@ -0,0 +1,250 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +require "monitor" +require "concurrent" + +module Google + module Cloud + module Pubsub + class Subscriber + ## + # @private + # # AsyncUnaryPusher + # + class AsyncUnaryPusher + include MonitorMixin + + attr_reader :batch + attr_reader :max_bytes, :interval + + def initialize stream, max_bytes: 10000000, interval: 1.0 + @stream = stream + + @max_bytes = max_bytes + @interval = interval + + @cond = new_cond + + # init MonitorMixin + super() + end + + def acknowledge ack_ids + return true if ack_ids.empty? + + synchronize do + ack_ids.each do |ack_id| + if @batch.nil? + @batch = Batch.new max_bytes: @max_bytes + @batch.ack ack_id + else + unless @batch.try_ack ack_id + push_batch_request! + + @batch = Batch.new max_bytes: @max_bytes + @batch.ack ack_id + end + end + + @batch_created_at ||= Time.now + @background_thread ||= Thread.new { run_background } + + push_batch_request! if @batch.ready? + end + + @cond.signal + end + + nil + end + + def delay deadline, ack_ids + return true if ack_ids.empty? + + synchronize do + ack_ids.each do |ack_id| + if @batch.nil? + @batch = Batch.new max_bytes: @max_bytes + @batch.delay deadline, ack_id + else + unless @batch.try_delay deadline, ack_id + push_batch_request! + + @batch = Batch.new max_bytes: @max_bytes + @batch.delay deadline, ack_id + end + end + + @batch_created_at ||= Time.now + @background_thread ||= Thread.new { run_background } + + push_batch_request! if @batch.ready? + end + + @cond.signal + end + + nil + end + + def stop + synchronize do + @stopped = true + + # Stop any background activity, clean up happens in wait! + @background_thread.kill if @background_thread + end + + push_batch_request! + end + + def started? + !stopped? + end + + def stopped? + synchronize { @stopped } + end + + protected + + def run_background + synchronize do + until @stopped + if @batch.nil? + @cond.wait + next + end + + time_since_first_publish = Time.now - @batch_created_at + if time_since_first_publish > @interval + # interval met, publish the batch... + push_batch_request! + @cond.wait + else + # still waiting for the interval to publish the batch... + @cond.wait(@interval - time_since_first_publish) + end + end + end + end + + def push_batch_request! + return unless @batch + + service = @stream.subscriber.service + name = @stream.subscriber.subscription_name + + if @batch.ack? + ack_ids = @batch.ack_ids + Concurrent::Future.new(executor: @stream.push_thread_pool) do + service.acknowledge name, *ack_ids + end.execute + end + if @batch.delay? + @batch.modify_deadline_hash.each do |delay_seconds, delay_ack_ids| + Concurrent::Future.new(executor: @stream.push_thread_pool) do + service.modify_ack_deadline name, delay_ack_ids, delay_seconds + end.execute + end + end + + @batch = nil + @batch_created_at = nil + end + + class Batch + attr_reader :max_bytes, :request + + def initialize max_bytes: 10000000 + @max_bytes = max_bytes + @request = Google::Pubsub::V1::StreamingPullRequest.new + @total_message_bytes = 0 + end + + def ack ack_id + @request.ack_ids << ack_id + @total_message_bytes += addl_ack_bytes ack_id + end + + def try_ack ack_id + addl_bytes = addl_ack_bytes ack_id + return false if total_message_bytes + addl_bytes >= @max_bytes + + ack ack_id + true + end + + def addl_ack_bytes ack_id + ack_id.bytesize + 2 + end + + def delay deadline, ack_id + @request.modify_deadline_seconds << deadline + @request.modify_deadline_ack_ids << ack_id + @total_message_bytes += addl_delay_bytes deadline, ack_id + end + + def try_delay deadline, ack_id + addl_bytes = addl_delay_bytes deadline, ack_id + return false if total_message_bytes + addl_bytes >= @max_bytes + + delay deadline, ack_id + true + end + + def addl_delay_bytes deadline, ack_id + bytes_for_int(deadline) + ack_id.bytesize + 4 + end + + def bytes_for_int num + # Ruby 2.0 does not have Integer#bit_length + return [num].pack("s").bytesize unless num.respond_to? :bit_length + + (num.bit_length / 8.0).ceil + end + + def ready? + total_message_bytes >= @max_bytes + end + + def ack? + @request.ack_ids.any? + end + + def delay? + @request.modify_deadline_ack_ids.any? + end + + def ack_ids + @request.ack_ids + end + + def modify_deadline_hash + @request.modify_deadline_ack_ids.zip( + @request.modify_deadline_seconds + ).group_by { |_ack_id, seconds| seconds } + end + + def total_message_bytes + @total_message_bytes + end + end + end + end + end + end +end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb index fba631f35afc..77053fd80455 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb @@ -13,7 +13,7 @@ # limitations under the License. -require "google/cloud/pubsub/subscriber/async_pusher" +require "google/cloud/pubsub/subscriber/async_unary_pusher" require "google/cloud/pubsub/subscriber/enumerator_queue" require "google/cloud/pubsub/service" require "google/cloud/errors" @@ -98,14 +98,7 @@ def wait! # Once all the callbacks are complete, we can stop the publisher # and send the final request to the steeam. - if @async_pusher - request = @async_pusher.stop - if request - Concurrent::Future.new(executor: @push_thread_pool) do - @request_queue.push request - end.execute - end - end + @async_pusher.stop if @async_pusher # will push current batch # Close the push thread pool now that the pusher is closed. @push_thread_pool.shutdown @@ -125,7 +118,7 @@ def acknowledge *messages return true if ack_ids.empty? synchronize do - @async_pusher ||= AsyncPusher.new self + @async_pusher ||= AsyncUnaryPusher.new self @async_pusher.acknowledge ack_ids @inventory.remove ack_ids unpause_streaming! @@ -141,7 +134,7 @@ def delay deadline, *messages return true if mod_ack_ids.empty? synchronize do - @async_pusher ||= AsyncPusher.new self + @async_pusher ||= AsyncUnaryPusher.new self @async_pusher.delay deadline, mod_ack_ids @inventory.remove mod_ack_ids unpause_streaming! @@ -168,7 +161,7 @@ def delay_inventory! synchronize do return true if @inventory.empty? - @async_pusher ||= AsyncPusher.new self + @async_pusher ||= AsyncUnaryPusher.new self @async_pusher.delay subscriber.deadline, @inventory.ack_ids end @@ -211,7 +204,7 @@ def background_run enum synchronize do # Create receipt of received messages reception - @async_pusher ||= AsyncPusher.new self + @async_pusher ||= AsyncUnaryPusher.new self @async_pusher.delay subscriber.deadline, received_ack_ids # Add received messages to inventory