Skip to content
This repository has been archived by the owner on Aug 25, 2022. It is now read-only.

Commit

Permalink
Add Subscriber Async Unary Pusher
Browse files Browse the repository at this point in the history
Instead of sending Ack/ModAck messages on the stream, send normal
RCP messages instead. This relieves some GRPC stream congestion.
Modifies the Async Stream Pusher batch to make RPC calls instead.
  • Loading branch information
blowmage committed Jun 11, 2018
1 parent 58810b5 commit 7f9d758
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ module Pubsub
class Subscriber
##
# @private
# # AsyncPusher
# # AsyncStreamPusher
#
class AsyncPusher
class AsyncStreamPusher
include MonitorMixin

attr_reader :batch
Expand Down Expand Up @@ -108,9 +108,7 @@ def stop
@background_thread.kill if @background_thread
end

return nil if @batch.nil?

@batch.request
push_batch_request!
end

def started?
Expand Down Expand Up @@ -147,10 +145,7 @@ def run_background
def push_batch_request!
return unless @batch

request = @batch.request
Concurrent::Future.new(executor: @stream.push_thread_pool) do
@stream.push request
end.execute
@stream.push @batch.request

@batch = nil
@batch_created_at = nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


require "monitor"
require "concurrent"

module Google
module Cloud
module Pubsub
class Subscriber
##
# @private
# # AsyncUnaryPusher
#
class AsyncUnaryPusher
include MonitorMixin

attr_reader :batch
attr_reader :max_bytes, :interval

def initialize stream, max_bytes: 10000000, interval: 1.0
@stream = stream

@max_bytes = max_bytes
@interval = interval

@cond = new_cond

# init MonitorMixin
super()
end

def acknowledge ack_ids
return true if ack_ids.empty?

synchronize do
ack_ids.each do |ack_id|
if @batch.nil?
@batch = Batch.new max_bytes: @max_bytes
@batch.ack ack_id
else
unless @batch.try_ack ack_id
push_batch_request!

@batch = Batch.new max_bytes: @max_bytes
@batch.ack ack_id
end
end

@batch_created_at ||= Time.now
@background_thread ||= Thread.new { run_background }

push_batch_request! if @batch.ready?
end

@cond.signal
end

nil
end

def delay deadline, ack_ids
return true if ack_ids.empty?

synchronize do
ack_ids.each do |ack_id|
if @batch.nil?
@batch = Batch.new max_bytes: @max_bytes
@batch.delay deadline, ack_id
else
unless @batch.try_delay deadline, ack_id
push_batch_request!

@batch = Batch.new max_bytes: @max_bytes
@batch.delay deadline, ack_id
end
end

@batch_created_at ||= Time.now
@background_thread ||= Thread.new { run_background }

push_batch_request! if @batch.ready?
end

@cond.signal
end

nil
end

def stop
synchronize do
@stopped = true

# Stop any background activity, clean up happens in wait!
@background_thread.kill if @background_thread
end

push_batch_request!
end

def started?
!stopped?
end

def stopped?
synchronize { @stopped }
end

protected

def run_background
synchronize do
until @stopped
if @batch.nil?
@cond.wait
next
end

time_since_first_publish = Time.now - @batch_created_at
if time_since_first_publish > @interval
# interval met, publish the batch...
push_batch_request!
@cond.wait
else
# still waiting for the interval to publish the batch...
@cond.wait(@interval - time_since_first_publish)
end
end
end
end

def push_batch_request!
return unless @batch

service = @stream.subscriber.service
name = @stream.subscriber.subscription_name

if @batch.ack?
ack_ids = @batch.ack_ids
Concurrent::Future.new(executor: @stream.push_thread_pool) do
service.acknowledge name, *ack_ids
end.execute
end
if @batch.delay?
@batch.modify_deadline_hash.each do |delay_seconds, delay_ack_ids|
Concurrent::Future.new(executor: @stream.push_thread_pool) do
service.modify_ack_deadline name, delay_ack_ids, delay_seconds
end.execute
end
end

@batch = nil
@batch_created_at = nil
end

class Batch
attr_reader :max_bytes, :request

def initialize max_bytes: 10000000
@max_bytes = max_bytes
@request = Google::Pubsub::V1::StreamingPullRequest.new
@total_message_bytes = 0
end

def ack ack_id
@request.ack_ids << ack_id
@total_message_bytes += addl_ack_bytes ack_id
end

def try_ack ack_id
addl_bytes = addl_ack_bytes ack_id
return false if total_message_bytes + addl_bytes >= @max_bytes

ack ack_id
true
end

def addl_ack_bytes ack_id
ack_id.bytesize + 2
end

def delay deadline, ack_id
@request.modify_deadline_seconds << deadline
@request.modify_deadline_ack_ids << ack_id
@total_message_bytes += addl_delay_bytes deadline, ack_id
end

def try_delay deadline, ack_id
addl_bytes = addl_delay_bytes deadline, ack_id
return false if total_message_bytes + addl_bytes >= @max_bytes

delay deadline, ack_id
true
end

def addl_delay_bytes deadline, ack_id
bytes_for_int(deadline) + ack_id.bytesize + 4
end

def bytes_for_int num
# Ruby 2.0 does not have Integer#bit_length
return [num].pack("s").bytesize unless num.respond_to? :bit_length

(num.bit_length / 8.0).ceil
end

def ready?
total_message_bytes >= @max_bytes
end

def ack?
@request.ack_ids.any?
end

def delay?
@request.modify_deadline_ack_ids.any?
end

def ack_ids
@request.ack_ids
end

def modify_deadline_hash
@request.modify_deadline_ack_ids.zip(
@request.modify_deadline_seconds
).group_by { |_ack_id, seconds| seconds }
end

def total_message_bytes
@total_message_bytes
end
end
end
end
end
end
end
19 changes: 6 additions & 13 deletions google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.


require "google/cloud/pubsub/subscriber/async_pusher"
require "google/cloud/pubsub/subscriber/async_unary_pusher"
require "google/cloud/pubsub/subscriber/enumerator_queue"
require "google/cloud/pubsub/service"
require "google/cloud/errors"
Expand Down Expand Up @@ -98,14 +98,7 @@ def wait!

# Once all the callbacks are complete, we can stop the publisher
# and send the final request to the steeam.
if @async_pusher
request = @async_pusher.stop
if request
Concurrent::Future.new(executor: @push_thread_pool) do
@request_queue.push request
end.execute
end
end
@async_pusher.stop if @async_pusher # will push current batch

# Close the push thread pool now that the pusher is closed.
@push_thread_pool.shutdown
Expand All @@ -125,7 +118,7 @@ def acknowledge *messages
return true if ack_ids.empty?

synchronize do
@async_pusher ||= AsyncPusher.new self
@async_pusher ||= AsyncUnaryPusher.new self
@async_pusher.acknowledge ack_ids
@inventory.remove ack_ids
unpause_streaming!
Expand All @@ -141,7 +134,7 @@ def delay deadline, *messages
return true if mod_ack_ids.empty?

synchronize do
@async_pusher ||= AsyncPusher.new self
@async_pusher ||= AsyncUnaryPusher.new self
@async_pusher.delay deadline, mod_ack_ids
@inventory.remove mod_ack_ids
unpause_streaming!
Expand All @@ -168,7 +161,7 @@ def delay_inventory!
synchronize do
return true if @inventory.empty?

@async_pusher ||= AsyncPusher.new self
@async_pusher ||= AsyncUnaryPusher.new self
@async_pusher.delay subscriber.deadline, @inventory.ack_ids
end

Expand Down Expand Up @@ -211,7 +204,7 @@ def background_run enum

synchronize do
# Create receipt of received messages reception
@async_pusher ||= AsyncPusher.new self
@async_pusher ||= AsyncUnaryPusher.new self
@async_pusher.delay subscriber.deadline, received_ack_ids

# Add received messages to inventory
Expand Down

0 comments on commit 7f9d758

Please sign in to comment.