Skip to content

Commit

Permalink
RPC mode
Browse files Browse the repository at this point in the history
Fixes #12
  • Loading branch information
carlhoerberg committed Apr 17, 2024
1 parent b38aab3 commit 93406a0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/amqpcat.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ class AMQPCat
end
end

def rpc(exchange : String, routing_key : String, exchange_type : String, format : String)
STDIN.blocking = false
loop do
connection = @client.connect
channel = connection.channel
open_channel_declare_exchange(connection, exchange, exchange_type)
channel.basic_consume("amq.rabbitmq.reply-to") do |msg|
format_output(STDOUT, format, msg)
end
props = AMQP::Client::Properties.new(reply_to: "amq.rabbitmq.reply-to")
while line = STDIN.gets
channel.basic_publish line, exchange, routing_key, props: props
end
sleep 1 # wait for the last reply
connection.close
break
rescue ex
STDERR.puts ex.message
sleep 2
end
end

private def queue(connection, name)
channel = connection.channel
channel.queue(name, auto_delete: true)
Expand Down
11 changes: 11 additions & 0 deletions src/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ p = OptionParser.parse do |parser|
parser.banner = "Usage: #{File.basename PROGRAM_NAME} [arguments]"
parser.on("-P", "--producer", "Producer mode, reading from STDIN, each line is a new message") { mode = :producer }
parser.on("-C", "--consumer", "Consume mode, message bodies are written to STDOUT") { mode = :consumer }
parser.on("-R", "--rpc", "Remote prodecure call mode, reading from STDIN and returning result STDOUT") { mode = :rpc }
parser.on("-u URI", "--uri=URI", "URI to AMQP server") { |v| uri = v }
parser.on("-e EXCHANGE", "--exchange=EXCHANGE", "Exchange (default: '')") { |v| exchange = v }
parser.on("-t EXCHANGETYPE", "--exchange-type=TYPE", "Exchange type (default: direct)") { |v| exchange_type = v }
Expand Down Expand Up @@ -73,4 +74,14 @@ when :consumer
abort p
end
cat.consume(exchange, routing_key, queue, queue_type, format, offset)
when :rpc
unless exchange
STDERR.puts "Error: Missing exchange argument."
abort p
end
unless routing_key
STDERR.puts "Error: Missing routing key argument."
abort p
end
cat.rpc(exchange, routing_key.not_nil!, exchange_type, format)
end

0 comments on commit 93406a0

Please sign in to comment.