Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename clustering_max_lag -> clustering_max_unsynced_actions and lag -> lag_in_bytes #810

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ module LavinMQ
end

private def stream_changes(socket, lz4)
acks = Channel(Int64).new(@config.clustering_max_lag)
acks = Channel(Int64).new(@config.clustering_max_unsynced_actions)
spawn send_ack_loop(acks, socket), name: "Send ack loop"
loop do
filename_len = lz4.read_bytes Int32, IO::ByteFormat::LittleEndian
Expand Down
5 changes: 3 additions & 2 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module LavinMQ

@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(Config.instance.clustering_max_lag)
@actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions)
getter id = -1
getter remote_address

Expand Down Expand Up @@ -176,14 +176,15 @@ module LavinMQ
remote_address: @remote_address.to_s,
sent_bytes: @sent_bytes,
acked_bytes: @acked_bytes,
lag_in_bytes: lag_in_bytes,
compression_ratio: @lz4.compression_ratio,
uncompressed_bytes: @lz4.uncompressed_bytes,
compressed_bytes: @lz4.compressed_bytes,
id: @id.to_s(36),
}.to_json(json)
end

def lag : Int64
def lag_in_bytes : Int64
@sent_bytes - @acked_bytes
end
end
Expand Down
22 changes: 11 additions & 11 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ module LavinMQ
property clustering_advertised_uri : String? = nil
property clustering_bind = "127.0.0.1"
property clustering_port = 5679
property clustering_max_lag = 8192 # number of clustering actions
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property clustering_max_unsynced_actions = 8192 # number of unsynced clustering actions
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property consumer_timeout : UInt64? = nil
property consumer_timeout_loop_interval = 60 # seconds
@@instance : Config = self.new
Expand Down Expand Up @@ -133,8 +133,8 @@ module LavinMQ
p.on("--clustering-bind=BIND", "Listen for clustering followers on this address (default: localhost)") do |v|
@clustering_bind = v
end
p.on("--clustering-max-lag=ACTIONS", "Max unsynced replicated messages") do |v|
@clustering_max_lag = v.to_i
p.on("--clustering-max-unsynced-actions=ACTIONS", "Maximum unsynced actions") do |v|
@clustering_max_unsynced_actions = v.to_i
end
p.on("--clustering-etcd-endpoints=URIs", "Comma separeted host/port pairs (default: 127.0.0.1:2379)") do |v|
@clustering_etcd_endpoints = v
Expand Down Expand Up @@ -235,13 +235,13 @@ module LavinMQ
private def parse_clustering(settings)
settings.each do |config, v|
case config
when "enabled" then @clustering = true?(v)
when "etcd_prefix" then @clustering_etcd_prefix = v
when "etcd_endpoints" then @clustering_etcd_endpoints = v
when "advertised_uri" then @clustering_advertised_uri = v
when "bind" then @clustering_bind = v
when "port" then @clustering_port = v.to_i32
when "max_lag" then @clustering_max_lag = v.to_i32
when "enabled" then @clustering = true?(v)
when "etcd_prefix" then @clustering_etcd_prefix = v
when "etcd_endpoints" then @clustering_etcd_endpoints = v
when "advertised_uri" then @clustering_advertised_uri = v
when "bind" then @clustering_bind = v
when "port" then @clustering_port = v.to_i32
when "max_unsynced_actions" then @clustering_max_unsynced_actions = v.to_i32
else
STDERR.puts "WARNING: Unrecognized configuration 'clustering/#{config}'"
end
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/http/controller/prometheus.cr
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ module LavinMQ
type: "gauge",
help: "Amount of follower nodes connected"})
@amqp_server.followers.each do |f|
writer.write({name: "follower_lag",
writer.write({name: "follower_lag_in_bytes",
labels: {id: f.id.to_s(36)},
value: f.lag,
value: f.lag_in_bytes,
type: "gauge",
help: "Bytes that hasn't been synchronized with the follower yet"})
end
Expand Down
7 changes: 0 additions & 7 deletions src/lavinmqctl.cr
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,6 @@ class LavinMQCtl
handle_response(resp, 200)
body = JSON.parse(resp.body)
if followers = body[0].dig("followers").as_a
followers.map do |f|
{
id: f.dig("id"),
address: f.dig("remote_address"),
lag: f.dig("sent_bytes").as_i64 - f.dig("acked_bytes").as_i64,
}
end
cluster_status_obj = {
this_node: body.dig(0, "name"),
version: body.dig(0, "applications", 0, "version"),
Expand Down
Loading