diff --git a/CHANGES b/CHANGES new file mode 100644 index 0000000..e377427 --- /dev/null +++ b/CHANGES @@ -0,0 +1,2 @@ += 0.1.0 - 4-Oct-2018 +* Initial release diff --git a/README.md b/README.md index 5df5b4d..43d6271 100644 --- a/README.md +++ b/README.md @@ -29,10 +29,13 @@ Or install it yourself as: ### Initialize a client +It is not recommended to directly create an actual client through `new` operation. Follow the example by specifying a protocol. This allows to easily switch the underlying messaging system from one type to another. Currently `:Stomp` and `:Kafka` are implemented. + ``` ManageIQ::Messaging.logger = Logger.new(STDOUT) client = ManageIQ::Messaging::Client.open( + :protocol => 'Stomp', :host => 'localhost', :port => 61616, :password => 'smartvm', @@ -46,10 +49,11 @@ Or install it yourself as: client.close ``` -Alternately, you can pass a block to `.open` without the need to explicitly close the client +Alternatively, you can pass a block to `.open` without the need to explicitly close the client. ``` ManageIQ::Messaging::Client.open( + :protocol => 'Stomp', :host => 'localhost', :port => 61616, :password => 'smartvm', @@ -76,15 +80,15 @@ This is the one-to-one publish/subscribe pattern. Multiple subscribers can subsc } ) - client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1', :limit => 10) do |messages| + client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1') do |messages| messages.each do |msg| # do stuff with msg.message and msg.payload - client.ack(msg.ack_ref) # ack is required, but you can do it before or after "do stuff" + client.ack(msg.ack_ref) end end # You can create a second client instance and call subscribe_messages with - # the same options. Then, both clients will take turns to consume the messages. + # the same options. Then both clients will take turns to consume the messages. ``` For better sending performance, you can publish a collection of messages together @@ -95,7 +99,7 @@ For better sending performance, you can publish a collection of messages togethe client.publish_messages([msg1, msg2]) ``` -Provide a block if you want `#publish_message` to wait on a response from the subscriber. +Provide a block if you want `#publish_message` to wait on a response from the subscriber. This feature may not be supported by every underlying messaging system. ``` client.publish_message( @@ -150,7 +154,7 @@ This is the one-to-many publish/subscribe pattern. Multiple subscribers can subs end ``` -By default, events are delivered to live subscribers only. `subscribe_topic`'s `persist_ref` is not required. If a subscriber wants to receive the events it missed when it is offline, it should always create with same same `client_ref` and subscribe to the topic with the same `persist_ref`. +By default, events are delivered to live subscribers only. Some messaging systems support persistence with options. ## Development diff --git a/examples/message.rb b/examples/message.rb index 255cab9..ea2cb35 100755 --- a/examples/message.rb +++ b/examples/message.rb @@ -23,7 +23,7 @@ def run puts "produced 5 messages" puts "consumer" - client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1', :limit => 10) do |messages| + client.subscribe_messages(:service => 'ems_operation', :affinity => 'ems_amazon1') do |messages| messages.each do |msg| do_stuff(msg) client.ack(msg.ack_ref) diff --git a/lib/manageiq/messaging/client.rb b/lib/manageiq/messaging/client.rb index 6764155..4d5e296 100644 --- a/lib/manageiq/messaging/client.rb +++ b/lib/manageiq/messaging/client.rb @@ -1,13 +1,42 @@ module ManageIQ module Messaging + # The abstract client class. It defines methods needed to publish or subscribe messages. + # It is not recommended to directly create a solid subclass instance. The proper way is + # to call class method +Client.open+ with desired protocol. For example: + # + # client = ManageIQ::Messaging::Client.open( + # :protocol => 'Stomp', + # :host => 'localhost', + # :port => 61616, + # :password => 'smartvm', + # :username => 'admin', + # :client_ref => 'generic_1', + # :encoding => 'json' + # ) + # + # To close the connection one needs to explicitly call +client.close+. + # Alternatively if a block is given for the +open+ method, the connection will be closed + # automatically before existing the block. For example: + # + # ManageIQ::Messaging::Client.open( + # :protocol => 'Stomp' + # :host => 'localhost', + # :port => 61616, + # :password => 'smartvm', + # :username => 'admin', + # :client_ref => 'generic_1' + # ) do |client| + # # do stuff with the client + # end + # end class Client - # Open or create a connection to the message broker - # @param options [Hash] the connection options - # @return [Client, nil] the client object if no block is given - # The optional block supply {|client| block }. The client will - # be automatically closed when the block terminates + # Open or create a connection to the message broker. + # Expected +options+ keys are: + # * :protocol (Implemented: 'Stomp', 'Kafka'. Default 'Stomp') + # * :encoding ('yaml' or 'json'. Default 'yaml') + # Other connection options are underlying messaging system specific. # - # Avaiable type: + # Returns a +Client+ instance if no block is given. def self.open(options) protocol = options[:protocol] || :Stomp client = Object.const_get("ManageIQ::Messaging::#{protocol}::Client").new(options) @@ -21,22 +50,31 @@ def self.open(options) nil end - # Publish to a message to a queue. The message will be delivered to only one - # subscriber. - # @param options [Hash] the message attributes. Expected keys are: - # :service (service and affinity are used to determine the queue name) - # :affinity (optional) - # :class_name (optional) - # :message (e.g. method_name or message type) - # :payload (user defined structure, following are some examples) - # :instance_id - # :args - # :miq_callback - # :sender (optional, identify the sender) - # - # - # Optionally a call back block {|response| block} can be provided to wait on - # the consumer to send an acknowledgment. + # Publish a message to a queue. The message will be delivered to only one subscriber. + # Expected keys in +options+ are: + # * :service (service and affinity are used to determine the queue name) + # * :affinity (optional) + # * :class_name (optional) + # * :message (e.g. method name or message type) + # * :payload (message body, a string or an user object that can be serialized) + # * :sender (optional, identify the sender) + # Other options are underlying messaging system specific. + # + # Optionally a call back block can be provided to wait on the consumer to send + # an acknowledgment. Not every underlying messaging system supports callback. + # Example: + # + # client.publish_message( + # :service => 'ems_operation', + # :affinity => 'ems_amazon1', + # :message => 'power_on', + # :payload => { + # :ems_ref => 'u987', + # :id => '123' + # } + # ) do |result| + # ansible_install_pkg(vm1) if result == 'running' + # end def publish_message(options, &block) assert_options(options, [:message, :service]) @@ -44,47 +82,44 @@ def publish_message(options, &block) end # Publish multiple messages to a queue. - # An aggregate version of `#publish_message `but for better performance - # All messages are sent in a batch + # An aggregate version of +#publish_message+ but for better performance. + # All messages are sent in a batch. Every element in +messages+ array is + # an +options+ hash. # - # @param messages [Array] a collection of options for `#publish_message` def publish_messages(messages) publish_messages_impl(messages) end - # Subscribe to receive messages from a queue - # - # @param options [Hash] attributes to configure how to receive messages. - # Available keys are: - # :service (service and affinity are used to determine the queue) - # :affinity (optional) - # :limit (optional, receives up to limit messages into the buffer) - # - # A callback block {|messages| block} needs to be provided to consume the - # messages. Example - # subscribe_message(options) do |messages| - # messages.collect do |msg| - # # from msg you get + # Subscribe to receive messages from a queue. + # Expected keys in +options+ are: + # * :service (service and affinity are used to determine the queue) + # * :affinity (optional) + # Other options are underlying messaging system specific. + # + # A callback block is needed to consume the messages: + # + # client.subscribe_message(options) do |messages| + # messages.each do |msg| + # # msg is a type of ManageIQ::Messaging::ReceivedMessage + # # attributes in msg # msg.sender # msg.message # msg.payload - # msg.ack_ref (used to ack the message) + # msg.ack_ref #used to ack the message # # client.ack(msg.ack_ref) - # # process - # result # a result sent back to sender if expected + # # process the message # end # end # - # @note The subscriber MUST ack each message independently in the callback - # block. It can decide when to ack according to whether a message can - # be retried. Ack the message in the beginning of processing if the - # message is not re-triable; otherwise ack it after the message is done. - # Any un-acked message will be redelivered to next subscriber AFTER the - # current subscriber disconnects normally or abnormally (e.g. crashed). - # Make sure a message is properly acked whatever strategy you take. + # Some messaging systems require the subscriber to ack each message in the + # callback block. The code in the block can decide when to ack according + # to whether a message can be retried. Ack the message in the beginning of + # processing if the message is not re-triable; otherwise ack it after the + # message is done. Any un-acked message will be redelivered to next subscriber + # AFTER the current subscriber disconnects normally or abnormally (e.g. crashed). # - # To ack a message call `ack(msg.ack_ref)` + # To ack a message call +ack+(+msg.ack_ref+) def subscribe_messages(options, &block) raise "A block is required" unless block_given? assert_options(options, [:service]) @@ -93,11 +128,23 @@ def subscribe_messages(options, &block) end # Subscribe to receive from a queue and run each message as a background job. - # @param options [Hash] attributes to configure how to receive messages - # :service (service and affinity are used to determine the queue) - # :affinity (optional) + # Expected keys in +options+ are: + # * :service (service and affinity are used to determine the queue) + # * :affinity (optional) + # Other options are underlying messaging system specific. + # + # This subscriber consumes messages sent through +publish_message+ with required + # +options+ keys, for example: # - # This subscriber works only if the incoming message includes the class_name option + # client.publish_message( + # :service => 'generic', + # :class_name => 'MiqTask', + # :message => 'update_attributes', # method name, for instance method :instance_id is required + # :payload => { + # :instance_id => 2, # database id of class instance stored in rails DB + # :args => [{:status => 'Timeout'}] # argument list expected by the method + # } + # ) # # Background job assumes each job is not re-triable. It will ack as soon as a request # is received @@ -108,12 +155,12 @@ def subscribe_background_job(options) end # Publish a message as a topic. All subscribers will receive a copy of the message. - # @param options [Hash] the message attributes. Expected keys are: - # :service (service is used to determine the topic address) - # :event (event name) - # :payload (user defined structure that describes the event) - # :sender (optional, identify the sender) - # + # Expected keys in +options+ are: + # * :service (service is used to determine the topic address) + # * :event (event name) + # * :payload (message body, a string or an user object that can be serialized) + # * :sender (optional, identify the sender) + # Other options are underlying messaging system specific. # def publish_topic(options) assert_options(options, [:event, :service]) @@ -122,14 +169,19 @@ def publish_topic(options) end # Subscribe to receive topic type messages. - # @param options [Hash] attributes to configure how to receive messages - # :service (service is used to determine the topic address) - # :persist_ref (optional, client needs to be have client_ref to use this feature) + # Expected keys in +options+ are: + # * :service (service is used to determine the topic address) + # Other options are underlying messaging system specific. # - # Persisted event: In order to consume events missed during the period when the client is - # offline, the subscriber needs to be reconnect always with the same client_ref and persist_ref + # Some messaging systems allow subscribers to consume events missed during the period when + # the client is offline when they reconnect. Additional options are needed to turn on + # this feature. # - # A callback {|sender, event, payload| block } needs to be provided to consume the topic + # A callback block is needed to consume the topic: + # + # client.subcribe_topic(:service => 'provider_events') do |sender, event, payload| + # # sender, event, and payload are from publish_topic + # end # def subscribe_topic(options, &block) raise "A block is required" unless block_given? diff --git a/lib/manageiq/messaging/common.rb b/lib/manageiq/messaging/common.rb index d4febfe..a559dfb 100644 --- a/lib/manageiq/messaging/common.rb +++ b/lib/manageiq/messaging/common.rb @@ -1,6 +1,8 @@ module ManageIQ module Messaging module Common + private + def encode_body(headers, body) return body if body.kind_of?(String) headers[:encoding] = encoding diff --git a/lib/manageiq/messaging/kafka/client.rb b/lib/manageiq/messaging/kafka/client.rb index 8a7aa33..20726cd 100644 --- a/lib/manageiq/messaging/kafka/client.rb +++ b/lib/manageiq/messaging/kafka/client.rb @@ -1,6 +1,41 @@ module ManageIQ module Messaging module Kafka + # Messaging client implementation with Kafka being the underlying supporting system. + # Do not directly instantiate an instance from this class. Use + # +ManageIQ::Messaging::Client.open+ method. + # + # Kafka specific connection options accepted by +open+ method: + # * :client_ref (A reference string to identify the client) + # * :hosts (Array of Kafka cluster hosts, or) + # * :host (Single host name) + # * :port (host port number) + # * :ssl_ca_cert (security options) + # * :ssl_client_cert + # * :ssl_client_cert_key + # * :sasl_gssapi_principal + # * :sasl_gssapi_keytab + # * :sasl_plain_username + # * :sasl_plain_password + # * :sasl_scram_username + # * :sasl_scram_password + # * :sasl_scram_mechanism + # + # Kafka specific +publish_message+ options: + # * :group_name (Used as Kafka partition_key) + # + # Kafka specific +subscribe_topic+ options: + # * :persist_ref (Used as Kafka group_id) + # + # Without +:persist_ref+ every topic subscriber receives a copy of each message + # only when they are active. If multiple topic subscribers join with the same + # +:persist_ref+, each message is consumed by only one of the subscribers. This + # allows a load balancing among the subscribers. Also any messages sent when + # all members of the +:persist_ref+ group are offline will be persisted and delivered + # when any member in the group is back online. Each message is still copied and + # delivered to other subscribers that belongs to other +:persist_ref+ groups or no group. + # + # +subscribe_background_job+ is currently not implemented. class Client < ManageIQ::Messaging::Client require 'kafka' require 'manageiq/messaging/kafka/common' @@ -36,12 +71,6 @@ def close attr_reader :kafka_client - # @options options :host - # @options options :hosts (array) - # @options options :port - # @options options :client_ref (optional) - # @options options :encoding (default to 'yaml') - # @options options :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, :sasl_gssapi_principal, :sasl_gssapi_keytab, :sasl_plain_username, :sasl_plain_password, :sasl_scram_username, :sasl_scram_password, :sasl_scram_mechanism def initialize(options) hosts = Array(options[:hosts] || options[:host]) hosts.collect! { |host| "#{host}:#{options[:port]}" } diff --git a/lib/manageiq/messaging/kafka/queue.rb b/lib/manageiq/messaging/kafka/queue.rb index 3da7ae9..26bbbf1 100644 --- a/lib/manageiq/messaging/kafka/queue.rb +++ b/lib/manageiq/messaging/kafka/queue.rb @@ -2,6 +2,8 @@ module ManageIQ module Messaging module Kafka module Queue + private + def publish_message_impl(options) raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given? raw_publish(true, *queue_for_publish(options)) diff --git a/lib/manageiq/messaging/stomp/client.rb b/lib/manageiq/messaging/stomp/client.rb index 002059d..ac5c714 100644 --- a/lib/manageiq/messaging/stomp/client.rb +++ b/lib/manageiq/messaging/stomp/client.rb @@ -1,6 +1,44 @@ module ManageIQ module Messaging module Stomp + # Messaging client implementation using Stomp protocol with ActiveMQ Artemis being + # the underlying supporting system. + # Do not directly instantiate an instance from this class. Use + # +ManageIQ::Messaging::Client.open+ method. + # + # Artemis specific connection options accepted by +open+ method: + # * :client_ref (A reference string to identify the client) + # * :host (Single host name) + # * :port (host port number) + # * :username + # * :password + # * :heartbeat (Whether the client should do heart-beating. Default to true) + # + # Artemis specific +publish_message+ options: + # * :expires_on + # * :deliver_on + # * :priority + # * :group_name + # + # Artemis specific +publish_topic+ options: + # * :expires_on + # * :deliver_on + # * :priority + # + # Artemis specific +subscribe_topic+ options: + # * :persist_ref + # + # +:persist_ref+ must be paired with +:client_ref+ option in +Client.open+ method. + # They jointly create a unique group name. Without such group every topic subscriber + # receives a copy of each message only when they are active. This is the default. + # If multiple topic subscribers join with the same group each message is consumed + # by only one of the subscribers. This allows a load balancing among the subscribers. + # Also any messages sent when all members of the group are offline will be persisted + # and delivered when any member in the group is back online. Each message is still + # copied and delivered to other subscribes belongs to other groups or no group. + # + # Artemis specific +subscribe_messages+ options: + # * :limit () class Client < ManageIQ::Messaging::Client require 'stomp' require 'manageiq/messaging/stomp/common' @@ -22,13 +60,6 @@ class Client < ManageIQ::Messaging::Client attr_reader :stomp_client - # @options options :host - # @options options :username - # @options options :password - # @options options :port - # @options options :client_ref (optional) - # @options options :heartbeat (optional, default to true) - # @options options :encoding (default to 'yaml') def initialize(options) host = options.slice(:host, :port) host[:passcode] = options[:password] if options[:password] diff --git a/lib/manageiq/messaging/stomp/queue.rb b/lib/manageiq/messaging/stomp/queue.rb index 74e378a..706010f 100644 --- a/lib/manageiq/messaging/stomp/queue.rb +++ b/lib/manageiq/messaging/stomp/queue.rb @@ -2,6 +2,8 @@ module ManageIQ module Messaging module Stomp module Queue + private + def publish_message_impl(options, &block) address, headers = queue_for_publish(options) headers[:sender] = options[:sender] if options[:sender] diff --git a/manageiq-messaging.gemspec b/manageiq-messaging.gemspec index b077ee5..160d259 100644 --- a/manageiq-messaging.gemspec +++ b/manageiq-messaging.gemspec @@ -21,8 +21,8 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency 'activesupport', '>= 4.2.2' - spec.add_dependency 'ruby-kafka', '>=0.7.0' - spec.add_dependency 'stomp', '>= 1.4.4' + spec.add_dependency 'ruby-kafka', '~> 0.7.0' + spec.add_dependency 'stomp', '~> 1.4.4' spec.add_development_dependency "bundler", "~> 1.13" spec.add_development_dependency "codeclimate-test-reporter", "~> 1.0.0"