Skip to content

Commit

Permalink
ensure limits in partition count
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld committed Jan 27, 2025
1 parent d202eb3 commit e2c429f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [Feature] Provide ability to navigate to a timestamp in the Explorer (Pro).
- [Enhancement] Provide better support for fully transactional consumers.
- [Enhancement] Error out when `#setup` is called after `#enable!`.
- [Fix] Direct URL access with too big partition causes librdkafka crash.
- [Fix] Fix incorrect breadcrumbs for pending consumer jobs.
- [Fix] Allow for using default search matchers in Karafka Web UI topics including Errors.
- [Maintenance] Require `karafka-core` `>= 2.4.8` and `karafka` `>= 2.4.16`.
Expand Down
4 changes: 2 additions & 2 deletions lib/karafka/web/pro/ui/routes/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ class Errors < Base
r.on 'errors' do
controller = Controllers::ErrorsController.new(params)

r.get Integer, Integer do |partition_id, offset|
r.get :partition_id, Integer do |partition_id, offset|
if params.current_offset != -1
r.redirect root_path('errors', partition_id, params.current_offset)
else
controller.show(partition_id, offset)
end
end

r.get Integer do |partition_id|
r.get :partition_id do |partition_id|
controller.partition(partition_id)
end

Expand Down
28 changes: 19 additions & 9 deletions lib/karafka/web/pro/ui/routes/explorer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,34 @@ class Explorer < Base
r.on 'messages' do
controller = Controllers::Explorer::MessagesController.new(params)

r.post String, Integer, Integer, 'republish' do |topic_id, partition_id, offset|
r.post(
String, :partition_id, Integer, 'republish'
) do |topic_id, partition_id, offset|
controller.republish(topic_id, partition_id, offset)
end

r.get String, Integer, Integer, 'download' do |topic_id, partition_id, offset|
r.get(
String, :partition_id, Integer, 'download'
) do |topic_id, partition_id, offset|
controller.download(topic_id, partition_id, offset)
end

r.get String, Integer, Integer, 'export' do |topic_id, partition_id, offset|
r.get(
String, :partition_id, Integer, 'export'
) do |topic_id, partition_id, offset|
controller.export(topic_id, partition_id, offset)
end
end

controller = Controllers::Explorer::ExplorerController.new(params)

r.get String, Integer, 'recent' do |topic_id, partition_id|
r.get String, :partition_id, 'recent' do |topic_id, partition_id|
controller.recent(topic_id, partition_id)
end

r.get String, Integer, Integer, 'surrounding' do |topic_id, partition_id, offset|
r.get(
String, :partition_id, Integer, 'surrounding'
) do |topic_id, partition_id, offset|
controller.surrounding(topic_id, partition_id, offset)
end

Expand All @@ -50,18 +58,20 @@ class Explorer < Base
end

# Jumps to offset matching the expected time
r.get String, Integer, 'closest', Time do |topic_id, partition_id, time|
r.get String, :partition_id, 'closest', Time do |topic_id, partition_id, time|
controller.closest(topic_id, partition_id, time)
end

# Jumps to the offset matching the expected timestamp
r.get String, Integer, 'closest', Integer do |topic_id, partition_id, timestamp|
r.get(
String, :partition_id, 'closest', Integer
) do |topic_id, partition_id, timestamp|
# To simplify we just convert timestamp to time with ms precision
time = Time.at(timestamp / 1_000.0)
controller.closest(topic_id, partition_id, time)
end

r.get String, Integer, Integer do |topic_id, partition_id, offset|
r.get String, :partition_id, Integer do |topic_id, partition_id, offset|
# If when viewing given message we get an offset of different message, we should
# redirect there. This allows us to support pagination with the current engine
if params.current_offset != -1
Expand All @@ -71,7 +81,7 @@ class Explorer < Base
end
end

r.get String, Integer do |topic_id, partition_id|
r.get String, :partition_id do |topic_id, partition_id|
controller.partition(topic_id, partition_id)
end

Expand Down
10 changes: 6 additions & 4 deletions lib/karafka/web/pro/ui/routes/scheduled_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ class ScheduledMessages < Base
controller.topic(topic_id)
end

r.get String, Integer do |topic_id, partition_id|
r.get String, :partition_id do |topic_id, partition_id|
controller.partition(topic_id, partition_id)
end

# Jumps to offset matching the expected time
r.get String, Integer, 'closest', Time do |topic_id, partition_id, time|
r.get String, :partition_id, 'closest', Time do |topic_id, partition_id, time|
controller.closest(topic_id, partition_id, time)
end

# Jumps to the offset matching the expected timestamp
r.get String, Integer, 'closest', Integer do |topic_id, partition_id, timestamp|
r.get(
String, :partition_id, 'closest', Integer
) do |topic_id, partition_id, timestamp|
# To simplify we just convert timestamp to time with ms precision
time = Time.at(timestamp / 1_000.0)
controller.closest(topic_id, partition_id, time)
Expand All @@ -52,7 +54,7 @@ class ScheduledMessages < Base
controller = Controllers::ScheduledMessages::MessagesController.new(params)

r.post(
String, Integer, Integer, 'cancel'
String, :partition_id, Integer, 'cancel'
) do |topic_id, partition_id, message_offset|
controller.cancel(topic_id, partition_id, message_offset)
end
Expand Down
11 changes: 11 additions & 0 deletions lib/karafka/web/ui/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Base < Roda
end

plugin :class_matchers
plugin :symbol_matchers

# Time matcher with optional hours, minutes and seconds
TIME_MATCHER = %r{(\d{4}-\d{2}-\d{2}/?(\d{2})?(:\d{2})?(:\d{2})?)}
Expand All @@ -128,6 +129,16 @@ class Base < Roda
raise Errors::Ui::NotFoundError
end

# Partitions ids cannot be bigger than 32 bit C int. We use this matcher to ensure we
# only support that big partition numbers. Otherwise librdkafka would crash.
symbol_matcher :partition_id, /(\d{1,14})/ do |value|
int_value = value.to_i

raise Errors::Ui::NotFoundError unless int_value.between?(0, 2_147_483_647)

[int_value]
end

# Allows us to build current path with additional params + it merges existing params into
# the query data. Query data takes priority over request params.
# @param query_data [Hash] query params we want to add to the current path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@
end
end

context 'when given partition is more than 32bit C int' do
before { get "explorer/#{topic}/2147483648" }

it do
expect(response).not_to be_ok
expect(response.status).to eq(404)
end
end

context 'when no data in the given partition' do
before { get "explorer/#{topic}/0" }

Expand Down

0 comments on commit e2c429f

Please sign in to comment.