From 5174310538daf9adeeebf9a64ea04327327e3d39 Mon Sep 17 00:00:00 2001 From: Vladimir Kochnev Date: Mon, 6 Jul 2015 16:31:10 +0300 Subject: [PATCH] Add basic support for ConnectionPool --- README.md | 71 ++++++++++++++++++++++++++++++++++ her.gemspec | 1 + lib/her/api.rb | 40 ++++++++++++++++--- lib/her/api/connection_pool.rb | 24 ++++++++++++ spec/connection_pool_spec.rb | 63 ++++++++++++++++++++++++++++++ 5 files changed, 194 insertions(+), 5 deletions(-) create mode 100644 lib/her/api/connection_pool.rb create mode 100644 spec/connection_pool_spec.rb diff --git a/README.md b/README.md index 8f88186c..d6820472 100644 --- a/README.md +++ b/README.md @@ -873,6 +873,77 @@ Her::API.setup url: "https://api.example.com", ssl: ssl_options do |c| end ``` +### Persistent connection + +Her creates a connection instance only once but it's not a whole picture. In the [basic usage example](#usage) `Faraday::Adapter::NetHttp` is used. It makes Her use Ruby's standard `Net::HTTP` library which doesn't force you to install additional http client gems. However you should know that `Net::HTTP` connections are not persistent in Faraday (not *reusable* or not *keep-alive* in other words) so new TCP/IP connection is established for each requet. + +To avoid this problem you should use a different HTTP client. For example, there is a [httpclient](https://github.com/nahi/httpclient) gem which supports persistent connections. Add it to Gemfile: + +```ruby +# Gemfile +gem 'httpclient' +``` + +And configure Her to use its adapter: + +```ruby +Her::API.setup url: "https://api.example.com" do |c| + # Request + c.use Faraday::Request::UrlEncoded + + # Response + c.use Her::Middleware::DefaultParseJSON + + # Adapter + c.use Faraday::Adapter::HTTPClient +end +``` + +Other http clients that support persistent connections are [Typhoeus](https://github.com/typhoeus/typhoeus), [Patron](https://github.com/toland/patron) and [Net::HTTP::Persistent](https://github.com/drbrain/net-http-persistent). + +Corresponding Faraday adapters are: + +```ruby + require 'typhoeus/adapters/faraday' # Typhoeus has its own + c.use Faraday::Adapter::Typhoeus +``` +Or: +```ruby + c.use Faraday::Adapter::Patron +``` +Or: +```ruby + c.use Faraday::Adapter::NetHttpPersistent +``` + +### Connection pool + +If you're using Her inside threads (for example by using with [puma](https://github.com/puma/puma) or [Sidekiq](https://github.com/mperham/sidekiq)) then it's worth to use a connection pool. Her has a basic connection pool support relying on [connection_pool](https://github.com/mperham/connection_pool) gem. Add it to your project: + +```ruby +# Gemfile +gem 'connection_pool' +``` + +And then you are able to use `:pool_size` and `:pool_timeout` options in `Her::API.setup`. There is also a convenient helper `Her::API.setup_pool`: + +```ruby +Her::API.setup_pool 5, url: "https://api.example.com" do |c| + # Request + c.use Faraday::Request::UrlEncoded + + # Response + c.use Her::Middleware::DefaultParseJSON + + # Adapter + c.use Faraday::Adapter::HTTPClient +end +``` + +To take full advantage of concurrent connection pool make sure you have a [persistent connection](#persistent-connection). + +*Note:* If you're using a `Net::HTTP::Persistent` then there's no need to `setup_pool` because this client has its own pool. + ## Testing Suppose we have these two models bound to your API: diff --git a/her.gemspec b/her.gemspec index f41d14ff..a4b67094 100644 --- a/her.gemspec +++ b/her.gemspec @@ -22,6 +22,7 @@ Gem::Specification.new do |s| s.add_development_dependency "rspec-its", "~> 1.0" s.add_development_dependency "fivemat", "~> 1.2" s.add_development_dependency "json", "~> 1.8" + s.add_development_dependency "connection_pool", "~> 2.2" s.add_runtime_dependency "activemodel", ">= 3.0.0", "<= 4.3.0" s.add_runtime_dependency "activesupport", ">= 3.0.0", "<= 4.3.0" diff --git a/lib/her/api.rb b/lib/her/api.rb index b83d239e..69925de4 100644 --- a/lib/her/api.rb +++ b/lib/her/api.rb @@ -13,6 +13,15 @@ def self.setup(opts={}, &block) @default_api = new(opts, &block) end + # Setup a default API as a connection pool. + # + # @param [Fixnum] size maximum number of connections in the pool + # @param [Hash] opts the same options as in {API.setup} + # + def self.setup_pool(size, opts={}, &block) + @default_api = new(opts.merge(:pool_size => size), &block) + end + # Create a new API object. This is useful to create multiple APIs and use them with the `uses_api` method. # If your application uses only one API, you should use Her::API.setup to configure the default API # @@ -34,8 +43,10 @@ def initialize(*args, &blk) # @param [Hash] opts the Faraday options # @option opts [String] :url The main HTTP API root (eg. `https://api.example.com`) # @option opts [String] :ssl A hash containing [SSL options](https://github.com/lostisland/faraday/wiki/Setting-up-SSL-certificates) + # @option opts [Fixnum] :pool_size Size of connection pool + # @option opts [Fixnum] :pool_timeout Timeout of connection pool # - # @return Faraday::Connection + # @return Her::API # # @example Setting up the default API connection # Her::API.setup :url => "https://api.example" @@ -71,11 +82,14 @@ def initialize(*args, &blk) def setup(opts={}, &blk) opts[:url] = opts.delete(:base_uri) if opts.include?(:base_uri) # Support legacy :base_uri option @options = opts + @faraday_options = @options.slice(*FARADAY_OPTIONS) + @faraday_block = blk - faraday_options = @options.reject { |key, value| !FARADAY_OPTIONS.include?(key.to_sym) } - @connection = Faraday.new(faraday_options) do |connection| - yield connection if block_given? - end + @connection = if opts[:pool_size] || opts[:pool_timeout] + make_faraday_pool + else + make_faraday_connection + end self end @@ -109,5 +123,21 @@ def request(opts={}) def self.default_api(opts={}) defined?(@default_api) ? @default_api : nil end + + # @private + def make_faraday_pool + require 'her/api/connection_pool' + pool_options = {} + pool_options[:size] = @options[:pool_size] if @options[:pool_size] + pool_options[:timeout] = @options[:pool_timeout] if @options[:pool_timeout] + ConnectionPool.new(pool_options) do + make_faraday_connection + end + end + + # @private + def make_faraday_connection + Faraday.new(@faraday_options, &@faraday_block) + end end end diff --git a/lib/her/api/connection_pool.rb b/lib/her/api/connection_pool.rb new file mode 100644 index 00000000..097bf072 --- /dev/null +++ b/lib/her/api/connection_pool.rb @@ -0,0 +1,24 @@ +begin + require 'connection_pool' +rescue LoadError + fail "'connection_pool' gem is required to use Her::API's pool_size and pool_timeout options" +end +require 'her/model/http' + +module Her + class API + class ConnectionPool < ::ConnectionPool + DELEGATED_METHODS = Model::HTTP::METHODS + + DELEGATED_METHODS.each do |method| + class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{method}(*args, &blk) + with do |conn| + conn.#{method}(*args, &blk) + end + end + RUBY + end + end + end +end diff --git a/spec/connection_pool_spec.rb b/spec/connection_pool_spec.rb new file mode 100644 index 00000000..8207e035 --- /dev/null +++ b/spec/connection_pool_spec.rb @@ -0,0 +1,63 @@ +# encoding: utf-8 +require File.join(File.dirname(__FILE__), "spec_helper.rb") +require 'her/api/connection_pool' + +describe Her::API::ConnectionPool do + it 'delegates http verb methods to connection' do + connection = double + pool = described_class.new { connection } + expect(connection).to receive(:get) + expect(connection).to receive(:post) + expect(connection).to receive(:put) + expect(connection).to receive(:patch) + expect(connection).to receive(:delete) + pool.get('/lol') + pool.post('/lol') + pool.put('/lol') + pool.patch('/lol') + pool.delete('/lol') + end + + describe 'when using with API' do + subject { Her::API.new } + + before do + i = -1 + mutex = Mutex.new + subject.setup :pool_size => 5, :url => "https://api.example.com" do |builder| + builder.adapter(:test) do |stub| + stub.get("/foo") do |env| + sleep 0.025 # simulate slow response + body = mutex.synchronize do + "Foo, it is #{i += 1}." + end + [200, {}, body] + end + end + end + end + + its(:options) { should == {:pool_size => 5, :url => "https://api.example.com"} } + + it 'creates only `pool_size` connections' do + should receive(:make_faraday_connection).exactly(5).times.and_call_original + threads = 10.times.map do + Thread.new do + subject.request(:_method => :get, :_path => "/foo") + end + end + threads.each(&:join) + end + + it 'just does the same thing as a single connection' do + threads = 10.times.map do + Thread.new do + subject.request(:_method => :get, :_path => "/foo") + end + end + values = threads.map { |t| t.value[:parsed_data] }.sort + expected_values = 10.times.map { |i| "Foo, it is #{i}." } + expect(values).to eq(expected_values) + end + end +end