Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Reporter to not know about implementations #676

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/lavinmq/client/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ require "../amqp"
require "../stats"
require "../sortable_json"
require "../error"
require "../reporter"

module LavinMQ
class Client
class Channel
include Stats
include SortableJSON
include Reportable

getter id, name
property? running = true
Expand All @@ -38,6 +40,9 @@ module LavinMQ
@next_msg_body_tmp = IO::Memory.new

rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"})
reportables @unacked, @consumers, @visited, @found_queues do |r|
r.report_raw "global_prefetch=#{global_prefetch_count} prefetch=#{prefetch_count}"
end

def initialize(@client : Client, @id : UInt16)
@log = Log.for "channel[client=#{@client.remote_address} id=#{@id}]"
Expand Down
3 changes: 3 additions & 0 deletions src/lavinmq/client/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ require "../error"
require "./amqp_connection"
require "../config"
require "../http/handler/websocket"
require "../reporter"

module LavinMQ
class Client
include Stats
include SortableJSON
include Reportable

getter vhost, channels, log, name
getter user
Expand All @@ -36,6 +38,7 @@ module LavinMQ
@last_sent_frame = RoughTime.monotonic
rate_stats({"send_oct", "recv_oct"})
DEFAULT_EX = "amq.default"
reportables channels

def initialize(@socket : IO,
@connection_info : ConnectionInfo,
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "../segment_position"
require "log"
require "file_utils"
require "../replication/server"
require "../reporter"

module LavinMQ
class Queue
Expand All @@ -15,6 +16,9 @@ module LavinMQ
# Messages are refered to as SegmentPositions
# Deleted messages are written to acks.#{segment}
class MessageStore
include Reportable

reportables @segments, @acks, @deleted, @segment_msg_count, @requeued
PURGE_YIELD_INTERVAL = 16_384
Log = ::Log.for("MessageStore")
@segments = Hash(UInt32, MFile).new
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require "../message"
require "../error"
require "./state"
require "./event"
require "../reporter"
require "./message_store"

module LavinMQ
Expand All @@ -17,6 +18,9 @@ module LavinMQ
include Observable(QueueEvent)
include Stats
include SortableJSON
include Reportable

reportables @consumers, @deliveries, @msg_store

@log : Log
@message_ttl : Int64?
Expand Down
130 changes: 85 additions & 45 deletions src/lavinmq/reporter.cr
Original file line number Diff line number Diff line change
@@ -1,55 +1,95 @@
module LavinMQ
module Reportable
abstract def __report(reporter : Reporter)

# Used to generate the __report method for Reportable. Items are a list
# of instance variables (must be prefiex with @) and getters (no prefix)
# that will be reported.
# If the variable/getter is a collection all items will also be reported if
# possible.
macro reportables(*items, &block)
def __report(reporter : Reporter)
# If a block is given, we include that body first, i.e
# the block is run before items is reported
{% if block %}
begin
# If the block is defined with a paratmeter, assign reporter to
# that name
{% if block.args.size > 0 %}
{{ block.args.first }} = reporter
{% end %}
{{ block.body }}
end
{% end %}
{% for item in items %}
reporter.report_capacity {{item.stringify}}, {{item.id}}
reporter.report {{item.id}}
{% end %}
end

def __report_type_name
{{@type.name.split("::").last.titleize}}
end
end
end

class Reporter
def self.report(s)
puts_size_capacity s.@users
s.users.each do |name, user|
puts "User #{name}"
puts_size_capacity user.@tags, 4
puts_size_capacity user.@permissions, 4
puts_size_capacity user.@acl_read_cache, 4
puts_size_capacity user.@acl_write_cache, 4
puts_size_capacity user.@acl_config_cache, 4
def initialize(@io : IO = STDOUT)
@level = -1
end

def report(reportables : Enumerable(Tuple(String, Reportable)))
reportables.each do |name, reportable|
report(reportable, header: "#{reportable.__report_type_name} #{name}")
end
puts_size_capacity s.@vhosts
s.vhosts.each do |name, vh|
puts "VHost #{name}"
puts_size_capacity vh.@exchanges, 4
puts_size_capacity vh.@queues, 4
vh.queues.each do |_, q|
puts " #{q.name} #{q.durable? ? "durable" : ""} args=#{q.arguments}"
puts_size_capacity q.@consumers, 6
puts_size_capacity q.@deliveries, 6
puts_size_capacity q.@msg_store.@segments, 6
puts_size_capacity q.@msg_store.@acks, 6
puts_size_capacity q.@msg_store.@deleted, 6
puts_size_capacity q.@msg_store.@segment_msg_count, 6
puts_size_capacity q.@msg_store.@requeued, 6
end
puts_size_capacity vh.@connections
vh.connections.each do |c|
puts " #{c.name}"
puts_size_capacity c.@channels, 4
c.channels.each_value do |ch|
puts " #{ch.id} global_prefetch=#{ch.global_prefetch_count} prefetch=#{ch.prefetch_count}"
puts_size_capacity ch.@unacked, 6
puts_size_capacity ch.@consumers, 6
puts_size_capacity ch.@visited, 6
puts_size_capacity ch.@found_queues, 6
end
end
end

def report(reportables : Enumerable(Tuple(UInt16, Reportable)))
reportables.each do |id, reportable|
report(reportable, header: "#{reportable.__report_type_name} #{id}")
end
end

macro puts_size_capacity(obj, indent = 0)
{{ indent }}.times do
STDOUT << ' '
def report(reportables : Enumerable(Reportable))
reportables.each &->self.report(Reportable)
end

def report(reportable : Reportable, header : String? = nil)
@level += 1
if header
report_raw(header)
@level += 1
end
reportable.__report(self)
if header
@level -= 1
end
@level -= 1
end

def report(_any)
# nop
end

def report_raw(value : String)
@io << indent << value << '\n'
end

def report_capacity(name, obj)
@io << indent << name << "\tsize=" << obj.size
if obj.responds_to?(:capacity)
@io << "\tcapacity=" << obj.capacity
end
STDOUT << "{{ obj.name }}"
STDOUT << " size="
STDOUT << {{obj}}.size
STDOUT << " capacity="
STDOUT << {{obj}}.capacity
STDOUT << '\n'
@io << '\n'
end

macro indent
" "*(@level*2)
end

def self.report(s)
r = self.new
r.report(s)
end
end
end
4 changes: 4 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ require "./connection_info"
require "./proxy_protocol"
require "./client/client"
require "./stats"
require "./reporter"

module LavinMQ
class Server
getter vhosts, users, data_dir, parameters
getter? closed, flow
include ParameterTarget
include Reportable

reportables users, vhosts

@start = Time.monotonic
@closed = false
Expand Down
14 changes: 8 additions & 6 deletions src/lavinmq/user.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "json"
require "./password"
require "./sortable_json"
require "./reporter"

module LavinMQ
enum Tag
Expand All @@ -17,10 +18,17 @@ module LavinMQ

class User
include SortableJSON
include Reportable
getter name, password, permissions
property tags, plain_text_password
alias Permissions = NamedTuple(config: Regex, read: Regex, write: Regex)

@acl_read_cache = Hash({String, String}, Bool).new
@acl_config_cache = Hash({String, String}, Bool).new
@acl_write_cache = Hash({String, String}, Bool).new

reportables tags, permissions, @acl_read_cache, @acl_write_cache, @acl_config_cache

@name : String
@permissions = Hash(String, Permissions).new
@password : Password? = nil
Expand Down Expand Up @@ -140,8 +148,6 @@ module LavinMQ
}
end

@acl_write_cache = Hash({String, String}, Bool).new

def can_write?(vhost, name)
cache_key = {vhost, name}
unless @acl_write_cache.has_key? cache_key
Expand All @@ -151,8 +157,6 @@ module LavinMQ
@acl_write_cache[cache_key]
end

@acl_read_cache = Hash({String, String}, Bool).new

def can_read?(vhost, name)
cache_key = {vhost, name}
unless @acl_read_cache.has_key? cache_key
Expand All @@ -162,8 +166,6 @@ module LavinMQ
@acl_read_cache[cache_key]
end

@acl_config_cache = Hash({String, String}, Bool).new

def can_config?(vhost, name)
cache_key = {vhost, name}
unless @acl_config_cache.has_key? cache_key
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ require "./queue"
require "./schema"
require "./event_type"
require "./stats"
require "./reporter"

module LavinMQ
class VHost
include SortableJSON
include Stats
include Reportable

rate_stats({"channel_closed", "channel_created", "connection_closed", "connection_created",
"queue_declared", "queue_deleted", "ack", "deliver", "get", "publish", "confirm",
"redeliver", "reject", "consumer_added", "consumer_removed"})

reportables exchanges, queues, connections

getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
direct_reply_consumers, connections, dir, users
property? flow = true
Expand Down
Loading