diff --git a/src/amqpcat.cr b/src/amqpcat.cr index ceaf447..b7895aa 100644 --- a/src/amqpcat.cr +++ b/src/amqpcat.cr @@ -59,6 +59,28 @@ class AMQPCat end end + def rpc(exchange : String, routing_key : String, exchange_type : String, format : String) + STDIN.blocking = false + loop do + connection = @client.connect + channel = connection.channel + open_channel_declare_exchange(connection, exchange, exchange_type) + channel.basic_consume("amq.rabbitmq.reply-to") do |msg| + format_output(STDOUT, format, msg) + end + props = AMQP::Client::Properties.new(reply_to: "amq.rabbitmq.reply-to") + while line = STDIN.gets + channel.basic_publish line, exchange, routing_key, props: props + end + sleep 1 # wait for the last reply + connection.close + break + rescue ex + STDERR.puts ex.message + sleep 2 + end + end + private def queue(connection, name) channel = connection.channel channel.queue(name, auto_delete: true) diff --git a/src/cli.cr b/src/cli.cr index 41288a3..d56c485 100644 --- a/src/cli.cr +++ b/src/cli.cr @@ -29,6 +29,7 @@ p = OptionParser.parse do |parser| parser.banner = "Usage: #{File.basename PROGRAM_NAME} [arguments]" parser.on("-P", "--producer", "Producer mode, reading from STDIN, each line is a new message") { mode = :producer } parser.on("-C", "--consumer", "Consume mode, message bodies are written to STDOUT") { mode = :consumer } + parser.on("-R", "--rpc", "Remote prodecure call mode, reading from STDIN and returning result STDOUT") { mode = :rpc } parser.on("-u URI", "--uri=URI", "URI to AMQP server") { |v| uri = v } parser.on("-e EXCHANGE", "--exchange=EXCHANGE", "Exchange (default: '')") { |v| exchange = v } parser.on("-t EXCHANGETYPE", "--exchange-type=TYPE", "Exchange type (default: direct)") { |v| exchange_type = v } @@ -73,4 +74,14 @@ when :consumer abort p end cat.consume(exchange, routing_key, queue, queue_type, format, offset) +when :rpc + unless exchange + STDERR.puts "Error: Missing exchange argument." + abort p + end + unless routing_key + STDERR.puts "Error: Missing routing key argument." + abort p + end + cat.rpc(exchange, routing_key.not_nil!, exchange_type, format) end