From 95cb0f6d2373561c12698dce79c046072dce5863 Mon Sep 17 00:00:00 2001 From: Ryan Garver Date: Thu, 5 May 2016 15:20:42 -0700 Subject: [PATCH] Introduce postgres transport. This is pretty much a read only version so far. --- forklift_etl.gemspec | 2 + lib/forklift/transports/pg.rb | 171 ++++++++++++++++++ .../pg/forklift_test_destination.yml | 3 + .../connections/pg/forklift_test_source_a.yml | 3 + .../connections/pg/forklift_test_source_b.yml | 3 + .../connections/pg/forklift_test_working.yml | 3 + spec/integration/multi_transport_spec.rb | 58 +++--- spec/spec_helper.rb | 1 + .../dumps/pg/forklift_test_source_a.sql | 55 ++++++ .../dumps/pg/forklift_test_source_b.sql | 16 ++ spec/support/spec_client.rb | 16 +- spec/support/spec_seeds.rb | 30 ++- spec/unit/connection/pg_spec.rb | 118 ++++++++++++ 13 files changed, 453 insertions(+), 26 deletions(-) create mode 100644 lib/forklift/transports/pg.rb create mode 100644 spec/config/connections/pg/forklift_test_destination.yml create mode 100644 spec/config/connections/pg/forklift_test_source_a.yml create mode 100644 spec/config/connections/pg/forklift_test_source_b.yml create mode 100644 spec/config/connections/pg/forklift_test_working.yml create mode 100644 spec/support/dumps/pg/forklift_test_source_a.sql create mode 100644 spec/support/dumps/pg/forklift_test_source_b.sql create mode 100644 spec/unit/connection/pg_spec.rb diff --git a/forklift_etl.gemspec b/forklift_etl.gemspec index 1a7f558..1b74f70 100644 --- a/forklift_etl.gemspec +++ b/forklift_etl.gemspec @@ -28,4 +28,6 @@ Gem::Specification.new do |s| s.add_development_dependency 'rake' s.add_development_dependency 'rspec' s.add_development_dependency 'email_spec' + s.add_development_dependency 'pry-byebug' + s.add_development_dependency 'pg' end diff --git a/lib/forklift/transports/pg.rb b/lib/forklift/transports/pg.rb new file mode 100644 index 0000000..9c1357e --- /dev/null +++ b/lib/forklift/transports/pg.rb @@ -0,0 +1,171 @@ +module Forklift + module Connection + class Pg < Forklift::Base::Connection + def initialize(config, forklift) + begin + require 'pg' + rescue LoadError + raise "To use the postgres connection you must add 'pg' to your Gemfile" + end + super(config, forklift) + end + + def connect + @client ||= PG::Connection.new(config) + end + + def disconnect + client.close + end + + def default_matcher + 'updated_at' + end + + def drop!(table) + q("DROP TABLE IF EXISTS #{quote_ident(table)}") + end + + def rename(table, new_table) + q("ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(new_table)}") + end + + def read(query, database=current_database, looping=true, limit=forklift.config[:batch_size], offset=0) + page = 1 + loop do + result = q(paginate_query(query, page, limit)) + + block_given? ? yield(result) : (return result) + return result if result.num_tuples < limit || !looping + page += 1 + end + end + + def write(rows, table, to_update=true, database=current_database, primary_key='id', lazy=true, crash_on_extral_col=false) + if tables.include? table + ensure_row_types(rows, table, database) + elsif lazy && rows.length > 0 + lazy_table_create(table, rows, database, primary_key) + end + + insert_values = [] + delete_keys = [] + rows.map do |row| + if to_update && !row[primary_key].nil? + delete_keys << row[primary_key] + else + insert_values << safe_values(columns, row) + end + end + + unless delete_keys.empty? + q(%{DELETE FROM #{quote_ident(table)} WHERE #{quote_ident(primary_key)} IN (#{delete_keys.join(',')})}) + end + + q(%{INSERT INTO #{quote_ident(table)} (#{safe_columns(columns)}) VALUES #{insert_values.join(',')}}) + forklift.logger.log "wrote #{rows.length} rows to `#{database}`.`#{table}`" + end + + # @todo + def lazy_table_create(table, data, database=current_database, primary_key='id', matcher=default_matcher) + raise NotImplementedError.new + end + + # @todo + def sql_type(v) + raise NotImplementedError.new + end + + def read_since(table, since, matcher=default_matcher, database=current_database, limit=forklift.config[:batch_size]) + query = %{SELECT * FROM #{quote_ident(table)} WHERE #{quote_ident(matcher)} >= #{client.escape_literal(since)} ORDER BY #{quote_ident(matcher)} ASC} + self.read(query, database, true, limit) do |rows| + if block_given? + yield rows + else + return rows + end + end + end + + def max_timestamp(table, matcher=default_matcher) + row = q(%{SELECT max(#{quote_ident(matcher)}) AS 'matcher' FROM #{quote_ident(table)}}).first + (row && row['matcher']) || Time.at(0) + end + + def tables + table_list = [] + read(%{SELECT table_name AS "table_name" FROM information_schema.tables WHERE table_schema = 'public'}) do |result| + table_list << result.map{|r| r['table_name']} + end + table_list.flatten.compact + end + + def current_database + client.db + end + + def count(table) + q(%{SELECT count(1) AS "count" FROM #{quote_ident(table)}})[0]['count'].to_i + end + + def truncate!(table) + q("TRUNCATE TABLE #{quote_ident(table)}") + end + + def truncate(table) + begin + self.truncate!(table) + rescue Exception => e + forklift.logger.debug e + end + end + + def columns(table, database=current_database, return_types=false) + columns = {} + read(%{SELECT column_name, data_type, character_maximum_length FROM "information_schema"."columns" WHERE table_name='#{table}'}) do |rows| + rows.each do |row| + type = case row['data_type'] + when 'character varying' then "varchar(#{row['character_maximum_length']})" + else row['data_type'] + end + columns[row['column_name']] = type + end + end + return_types ? columns : columns.keys + end + + def dump(file, options=[]) + end + + def exec_script(path) + end + + def q(query, options={}) + forklift.logger.debug "\tSQL[#{config[:database]}]: #{query}" + client.exec(query) + end + + private + def ensure_row_types(rows, table, database=current_database) + columns = columns(table, database) + rows.each do |row| + row.each do |column, value| + unless columns.include?(column) + q(%{ALTER TABLE #{quote_ident(table)} ADD #{quote_ident(column)} #{sql_type(value)} NULL DEFAULT NULL}) + columns = columns(table, database) + end + end + end + end + + def paginate_query(query, page, page_size) + offset = (page-1) * page_size + [query, "LIMIT #{page_size} OFFSET #{offset}"].join(' ') + end + + def quote_ident(table) + PG::Connection.quote_ident(table) + end + end + end +end diff --git a/spec/config/connections/pg/forklift_test_destination.yml b/spec/config/connections/pg/forklift_test_destination.yml new file mode 100644 index 0000000..e4761ad --- /dev/null +++ b/spec/config/connections/pg/forklift_test_destination.yml @@ -0,0 +1,3 @@ +:dbname: forklift_test_destination +:host: 127.0.0.1 +:port: 5432 diff --git a/spec/config/connections/pg/forklift_test_source_a.yml b/spec/config/connections/pg/forklift_test_source_a.yml new file mode 100644 index 0000000..9fae817 --- /dev/null +++ b/spec/config/connections/pg/forklift_test_source_a.yml @@ -0,0 +1,3 @@ +:dbname: forklift_test_source_a +:host: 127.0.0.1 +:port: 5432 diff --git a/spec/config/connections/pg/forklift_test_source_b.yml b/spec/config/connections/pg/forklift_test_source_b.yml new file mode 100644 index 0000000..f2c572a --- /dev/null +++ b/spec/config/connections/pg/forklift_test_source_b.yml @@ -0,0 +1,3 @@ +:dbname: forklift_test_source_b +:host: 127.0.0.1 +:port: 5432 diff --git a/spec/config/connections/pg/forklift_test_working.yml b/spec/config/connections/pg/forklift_test_working.yml new file mode 100644 index 0000000..7341f04 --- /dev/null +++ b/spec/config/connections/pg/forklift_test_working.yml @@ -0,0 +1,3 @@ +:dbname: forklift_test_working +:host: 127.0.0.1 +:port: 5432 diff --git a/spec/integration/multi_transport_spec.rb b/spec/integration/multi_transport_spec.rb index 3487219..937b988 100644 --- a/spec/integration/multi_transport_spec.rb +++ b/spec/integration/multi_transport_spec.rb @@ -1,28 +1,27 @@ require 'spec_helper' describe 'multiple trasport types' do + describe 'elasticsearch => mysql' do + before(:each) do + SpecSeeds.setup_mysql + SpecSeeds.setup_elasticsearch + end - before(:each) do - SpecSeeds.setup_mysql - SpecSeeds.setup_elasticsearch - end - - describe 'elasticsearch => mysql' do it 'can load in a full query' do table = 'es_import' index = 'forklift_test' query = { query: { match_all: {} } } - plan = SpecPlan.new - plan.do! { - source = plan.connections[:elasticsearch][:forklift_test] - destination = plan.connections[:mysql][:forklift_test_destination] - source.read(index, query) {|data| destination.write(data, table) } - } - plan.disconnect! - - destination = SpecClient.mysql('forklift_test_destination') - rows = destination.query("select count(1) as 'count' from es_import").first["count"] - expect(rows).to eql 5 + begin + plan = SpecPlan.new + plan.do! { + source = plan.connections[:elasticsearch][:forklift_test] + destination = plan.connections[:mysql][:forklift_test_destination] + source.read(index, query) {|data| destination.write(data, table) } + expect(destination.count('es_import')).to eql(5) + } + ensure + plan.disconnect! + end end it 'can load in a partial query' do @@ -54,13 +53,13 @@ plan.do! { source = plan.connections[:elasticsearch][:forklift_test] destination = plan.connections[:mysql][:forklift_test_destination] - source.read(index, query) {|data| + source.read(index, query) {|data| clean_data = [] data.each do |row| row[:viewed_at] = Time.at(row[:viewed_at]) clean_data << row end - destination.write(clean_data, table) + destination.write(clean_data, table) } } plan.disconnect! @@ -72,7 +71,11 @@ end - describe 'mysql => elasticsearch' do + describe 'mysql => elasticsearch' do + before(:each) do + SpecSeeds.setup_mysql + SpecSeeds.setup_elasticsearch + end after(:each) do es = SpecClient.elasticsearch('forklift_test') @@ -94,7 +97,7 @@ count = destination.count({ index: index })["count"] expect(count).to eql 5 end - + it 'can load in only some rows' do table = 'users' index = 'users' @@ -102,7 +105,7 @@ plan.do! { source = plan.connections[:mysql][:forklift_test_source_a] destination = plan.connections[:elasticsearch][:forklift_test] - source.read("select * from #{table}", source.current_database, false, 3, 0) {|data| + source.read("select * from #{table}", source.current_database, false, 3, 0) {|data| destination.write(data, index) } } @@ -114,4 +117,13 @@ end end -end \ No newline at end of file + describe 'postgres => mysql' do + before do + SpecSeeds.setup_mysql + SpecSeeds.setup_postgres + end + + it 'can load in a full table' + it 'can load in only some rows' + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 72a59f8..575a0a8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -12,6 +12,7 @@ require 'forklift' require 'rspec' require 'fileutils' +require 'pry-byebug' ENV["FORKLIFT_RUN_ALL_STEPS"] = 'true' diff --git a/spec/support/dumps/pg/forklift_test_source_a.sql b/spec/support/dumps/pg/forklift_test_source_a.sql new file mode 100644 index 0000000..66ed9e2 --- /dev/null +++ b/spec/support/dumps/pg/forklift_test_source_a.sql @@ -0,0 +1,55 @@ +SET client_min_messages TO WARNING; +DROP TABLE IF EXISTS "products"; + +CREATE TABLE "products" ( + "id" SERIAL NOT NULL PRIMARY KEY, + "name" varchar(255) NOT NULL DEFAULT '', + "description" text NOT NULL, + "inventory" integer DEFAULT NULL, + "created_at" timestamp NOT NULL, + "updated_at" timestamp NOT NULL +); + +INSERT INTO "products" ("id", "name", "description", "inventory", "created_at", "updated_at") +VALUES + (1,'car','a car',10,'2014-04-03 11:45:51','2014-04-03 11:45:51'), + (2,'boat','a boat',3,'2014-04-03 11:45:52','2014-04-03 11:45:52'), + (3,'bus','a bus',5,'2014-04-03 11:45:54','2014-04-03 11:45:54'), + (4,'motorcycle','a motorcycle',23,'2014-04-03 11:45:56','2014-04-03 11:45:56'), + (5,'hang_glider','awesome',2,'2014-04-03 11:46:19','2014-04-03 11:46:19'); + +DROP TABLE IF EXISTS "sales"; + +CREATE TABLE "sales" ( + "id" SERIAL NOT NULL PRIMARY KEY, + "user_id" integer NOT NULL, + "product_id" integer NOT NULL, + "timestamp" timestamp NOT NULL +); + +INSERT INTO "sales" ("id", "user_id", "product_id", "timestamp") +VALUES + (1,1,1,'2014-04-03 11:47:11'), + (2,1,2,'2014-04-03 11:47:11'), + (3,4,5,'2014-04-03 11:47:12'), + (4,4,4,'2014-04-03 11:47:25'), + (5,5,5,'2014-04-03 11:47:26'); + +DROP TABLE IF EXISTS "users"; + +CREATE TABLE "users" ( + "id" SERIAL NOT NULL PRIMARY KEY, + "email" varchar(255) NOT NULL DEFAULT '', + "first_name" varchar(255) NOT NULL DEFAULT '', + "last_name" varchar(255) NOT NULL DEFAULT '', + "created_at" timestamp NOT NULL, + "updated_at" timestamp NOT NULL +); + +INSERT INTO "users" ("id", "email", "first_name", "last_name", "created_at", "updated_at") +VALUES + (1,'evan@example.com','Evan','T','2014-04-03 11:40:12','2014-04-03 11:39:28'), + (2,'pablo@example.com','Pablo ','J','2014-04-03 11:41:08','2014-04-03 11:41:08'), + (3,'kevin@example.com','Kevin','B','2014-04-03 11:41:10','2014-04-03 11:41:10'), + (4,'brian@example.com','Brian','L','2014-04-03 11:41:12','2014-04-03 11:41:12'), + (5,'aaront@example.com','Aaron','B','2014-04-03 11:41:13','2014-04-03 11:41:13'); diff --git a/spec/support/dumps/pg/forklift_test_source_b.sql b/spec/support/dumps/pg/forklift_test_source_b.sql new file mode 100644 index 0000000..1072406 --- /dev/null +++ b/spec/support/dumps/pg/forklift_test_source_b.sql @@ -0,0 +1,16 @@ +SET client_min_messages TO WARNING; +DROP TABLE IF EXISTS "admin_notes"; + +CREATE TABLE "admin_notes" ( + "id" SERIAL NOT NULL PRIMARY KEY, + "user_id" integer NOT NULL, + "note" text NOT NULL, + "created_at" timestamp NOT NULL, + "updated_at" timestamp NOT NULL +); + +INSERT INTO "admin_notes" ("id", "user_id", "note", "created_at", "updated_at") +VALUES + (1,1,'User 1 called customer support\n','2014-04-03 11:50:25','2014-04-03 11:50:25'), + (2,2,'User 2 called customer support','2014-04-03 11:50:26','2014-04-03 11:50:26'), + (3,5,'User 5 returned the purchase','2014-04-03 11:50:28','2014-04-03 11:50:28'); diff --git a/spec/support/spec_client.rb b/spec/support/spec_client.rb index a10679e..f037c30 100644 --- a/spec/support/spec_client.rb +++ b/spec/support/spec_client.rb @@ -13,7 +13,7 @@ def self.mysql(name) db = config[:database] config.delete(:database) connection = ::Mysql2::Client.new(config) - begin + begin connection.query("use `#{db}`") rescue Exception => e puts "#{e} => will create new databse #{db}" @@ -21,6 +21,18 @@ def self.mysql(name) connection end + def self.pg(name) + file = File.join(File.dirname(__FILE__), '..', 'config', 'connections', 'pg', "#{name}.yml") + config = self.load_config(file) + db = config[:dbname] + pg_conn = ::PG::Connection.new(config.merge(dbname: 'postgres')) + pg_conn.exec(%{DROP DATABASE IF EXISTS #{db}}) + pg_conn.exec(%{CREATE DATABASE #{db}}) + pg_conn.close + + ::PG::Connection.new(config) + end + def self.elasticsearch(name) file = File.join(File.dirname(__FILE__), '..', 'config', 'connections', 'elasticsearch', "#{name}.yml") config = self.load_config(file) @@ -31,4 +43,4 @@ def self.csv(file) CSV.read(file, headers: true, converters: :all).map {|r| r = r.to_hash.symbolize_keys } end -end \ No newline at end of file +end diff --git a/spec/support/spec_seeds.rb b/spec/support/spec_seeds.rb index 70d75c0..c5d0c95 100644 --- a/spec/support/spec_seeds.rb +++ b/spec/support/spec_seeds.rb @@ -33,6 +33,34 @@ def self.setup_mysql end end + def self.setup_pg + @pg_connections = [] + pg_databases = [] + + files = Dir["#{File.dirname(__FILE__)}/../config/connections/pg/*.yml"] + files.each do |f| + name = f.split('/').last.gsub('.yml','') + @pg_connections << ::SpecClient.pg(name) + pg_databases << name + end + + @pg_connections.each do |conn| + db = conn.db + seed = File.join(File.dirname(__FILE__), '..', 'support', 'dumps', 'pg', "#{db}.sql") + if File.exists? seed + lines = File.read(seed).split(";") + lines.each do |line| + conn.exec(line) if line[0] != "#" + end + end + end + end + + def self.teardown_pg + @pg_connections.map(&:close) + @pg_connections.clear + end + def self.setup_elasticsearch elasticsearch_connections = [] elasticsearch_databases = [] @@ -76,4 +104,4 @@ def self.setup_csv FileUtils.copy(seed, source) end -end \ No newline at end of file +end diff --git a/spec/unit/connection/pg_spec.rb b/spec/unit/connection/pg_spec.rb new file mode 100644 index 0000000..132b07e --- /dev/null +++ b/spec/unit/connection/pg_spec.rb @@ -0,0 +1,118 @@ +require 'spec_helper' +require 'zlib' + +describe Forklift::Connection::Pg do + + describe "read/write utils" do + around(:each) do |ex| + begin + SpecSeeds.setup_pg + ex.run + ensure + SpecSeeds.teardown_pg + end + end + + it "can read a list of tables" do + plan = SpecPlan.new + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + expect(source.tables).to include 'users' + expect(source.tables).to include 'products' + expect(source.tables).to include 'sales' + } + plan.disconnect! + end + + it "can delete a table" do + plan = SpecPlan.new + table = "users" + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + expect(source.tables).to include table + source.drop! table + expect(source.tables).to_not include table + } + plan.disconnect! + end + + it "can count the rows in a table" do + plan = SpecPlan.new + table = "users" + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + expect(source.count(table)).to eql 5 + } + plan.disconnect! + end + + it "can truncate a table (both with and without !)" do + plan = SpecPlan.new + table = "users" + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + expect(source.count(table)).to eql 5 + source.truncate! table + expect(source.count(table)).to eql 0 + expect { source.truncate(table) }.to_not raise_error + } + plan.disconnect! + end + + it 'truncate! will raise if the table does not exist' do + plan = SpecPlan.new + table = "other_table" + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + expect { source.truncate!(table) }.to raise_error(/ERROR: relation "other_table" does not exist/) + } + plan.disconnect! + end + + it "can get the columns of a table" do + plan = SpecPlan.new + table = "sales" + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + columns = source.columns(table) + expect(columns).to include 'id' + expect(columns).to include 'user_id' + expect(columns).to include 'product_id' + expect(columns).to include 'timestamp' + } + plan.disconnect! + end + + it "can create a mysqldump" do + dump = "/tmp/destination.sql.gz" + plan = SpecPlan.new + plan.do! { + source = plan.connections[:pg][:forklift_test_source_a] + source.dump(dump) + } + plan.disconnect! + + expect(File.exists?(dump)).to eql true + contents = Zlib::GzipReader.new(StringIO.new(File.read(dump))).read + expect(contents).to include "(1,'evan@example.com','Evan','T','2014-04-03 11:40:12','2014-04-03 11:39:28')" + end + + end + + describe "#safe_values" do + pending 'PG getting write support' + #subject { described_class.new({}, {}) } + + #it "escapes one trailing backslash" do + #columns = ['col'] + #values = {'col' => "foo\\"} + #expect(subject.send(:safe_values, columns, values)).to eq("(\"foo\\\\\")") + #end + + #it "escapes two trailing backslashes" do + #columns = ['col'] + #values = {'col' => "foo\\\\" } + #expect(subject.send(:safe_values, columns, values)).to eq("(\"foo\\\\\\\\\")") + #end + end +end