Skip to content

Commit

Permalink
Merge pull request #363 from bestie/roundrobin-topic-switching
Browse files Browse the repository at this point in the history
Configurable strategy for consuming multiple topics
  • Loading branch information
deepredsky authored Jan 24, 2024
2 parents d7d3545 + 236478f commit e4ed888
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
3 changes: 3 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class Config < KingKonf::Config
desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy"
integer :liveness_probe_max_interval, default: 5

desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x"
string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic"

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down
12 changes: 9 additions & 3 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def poll_with_retries(max_wait_time_ms)

# polls a message for the current consumer, handling any API edge cases.
def poll_current_consumer(max_wait_time_ms)
@last_poll_read_nil_message = false
msg = current.poll(max_wait_time_ms)
rescue Rdkafka::RdkafkaError => e
case e.code
Expand Down Expand Up @@ -212,9 +213,14 @@ def reset_current_consumer
end

def maybe_select_next_consumer
return unless @last_poll_read_nil_message
@last_poll_read_nil_message = false
select_next_consumer
case @config.multi_subscription_strategy
when "round-robin"
select_next_consumer
else # "exhaust-topic"
if @last_poll_read_nil_message
select_next_consumer
end
end
end

def select_next_consumer
Expand Down
54 changes: 54 additions & 0 deletions spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -572,5 +572,59 @@ def message_generator(messages)
count.times { polled += consumer_set.batch_poll(100) rescue [] }
expect(polled).to eq [:msg1, :msg1, :msg1, :msgN, :msgN, :msgN]
end

context "when multiple consumers are configured as 'round-robin'" do
before do
config.multi_subscription_strategy = "round-robin"
allow(rdconsumer1).to receive(:poll).and_return(topic1_message)
allow(rdconsumer2).to receive(:poll).and_return(topic2_message)
allow(rdconsumer3).to receive(:poll).and_return(topic3_message)
end

let(:config) { Racecar::Config.new }
let(:interval) { 1000.0 }
let(:topic1_message) { double(:topic1_message) }
let(:topic2_message) { double(:topic2_message) }
let(:topic3_message) { double(:topic3_message) }


describe "#poll" do
it "consumes 1 message from each topic in turn" do
messages = 6.times.map {
consumer_set.poll(interval)
}

expect(messages).to eq([
topic1_message,
topic2_message,
topic3_message,
topic1_message,
topic2_message,
topic3_message,
])
end
end

describe "#batch_poll" do
before do
config.fetch_messages = 1
end

it "consumes 1 batch from each topic in turn" do
messages = 6.times.map {
consumer_set.batch_poll(interval)
}

expect(messages).to eq([
[topic1_message],
[topic2_message],
[topic3_message],
[topic1_message],
[topic2_message],
[topic3_message],
])
end
end
end
end
end

0 comments on commit e4ed888

Please sign in to comment.