Skip to content

Commit

Permalink
Introduce postgres transport. This is pretty much a read only version…
Browse files Browse the repository at this point in the history
… so far.
  • Loading branch information
rgarver committed May 6, 2016
1 parent 5c2e10e commit c64f1bb
Show file tree
Hide file tree
Showing 13 changed files with 452 additions and 26 deletions.
1 change: 1 addition & 0 deletions forklift_etl.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rake'
s.add_development_dependency 'rspec'
s.add_development_dependency 'email_spec'
s.add_development_dependency 'pg'
end
171 changes: 171 additions & 0 deletions lib/forklift/transports/pg.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
module Forklift
module Connection
class Pg < Forklift::Base::Connection
def initialize(config, forklift)
begin
require 'pg'
rescue LoadError
raise "To use the postgres connection you must add 'pg' to your Gemfile"
end
super(config, forklift)
end

def connect
@client ||= PG::Connection.new(config)
end

def disconnect
client.close
end

def default_matcher
'updated_at'
end

def drop!(table)
q("DROP TABLE IF EXISTS #{quote_ident(table)}")
end

def rename(table, new_table)
q("ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(new_table)}")
end

def read(query, database=current_database, looping=true, limit=forklift.config[:batch_size], offset=0)
page = 1
loop do
result = q(paginate_query(query, page, limit))

block_given? ? yield(result) : (return result)
return result if result.num_tuples < limit || !looping
page += 1
end
end

def write(rows, table, to_update=true, database=current_database, primary_key='id', lazy=true, crash_on_extral_col=false)
if tables.include? table
ensure_row_types(rows, table, database)
elsif lazy && rows.length > 0
lazy_table_create(table, rows, database, primary_key)
end

insert_values = []
delete_keys = []
rows.map do |row|
if to_update && !row[primary_key].nil?
delete_keys << row[primary_key]
else
insert_values << safe_values(columns, row)
end
end

unless delete_keys.empty?
q(%{DELETE FROM #{quote_ident(table)} WHERE #{quote_ident(primary_key)} IN (#{delete_keys.join(',')})})
end

q(%{INSERT INTO #{quote_ident(table)} (#{safe_columns(columns)}) VALUES #{insert_values.join(',')}})
forklift.logger.log "wrote #{rows.length} rows to `#{database}`.`#{table}`"
end

# @todo
def lazy_table_create(table, data, database=current_database, primary_key='id', matcher=default_matcher)
raise NotImplementedError.new
end

# @todo
def sql_type(v)
raise NotImplementedError.new
end

def read_since(table, since, matcher=default_matcher, database=current_database, limit=forklift.config[:batch_size])
query = %{SELECT * FROM #{quote_ident(table)} WHERE #{quote_ident(matcher)} >= #{client.escape_literal(since)} ORDER BY #{quote_ident(matcher)} ASC}
self.read(query, database, true, limit) do |rows|
if block_given?
yield rows
else
return rows
end
end
end

def max_timestamp(table, matcher=default_matcher)
row = q(%{SELECT max(#{quote_ident(matcher)}) AS 'matcher' FROM #{quote_ident(table)}}).first
(row && row['matcher']) || Time.at(0)
end

def tables
table_list = []
read(%{SELECT table_name AS "table_name" FROM information_schema.tables WHERE table_schema = 'public'}) do |result|
table_list << result.map{|r| r['table_name']}
end
table_list.flatten.compact
end

def current_database
client.db
end

def count(table)
q(%{SELECT count(1) AS "count" FROM #{quote_ident(table)}})[0]['count'].to_i
end

def truncate!(table)
q("TRUNCATE TABLE #{quote_ident(table)}")
end

def truncate(table)
begin
self.truncate!(table)
rescue Exception => e
forklift.logger.debug e
end
end

def columns(table, database=current_database, return_types=false)
columns = {}
read(%{SELECT column_name, data_type, character_maximum_length FROM "information_schema"."columns" WHERE table_name='#{table}'}) do |rows|
rows.each do |row|
type = case row['data_type']
when 'character varying' then "varchar(#{row['character_maximum_length']})"
else row['data_type']
end
columns[row['column_name']] = type
end
end
return_types ? columns : columns.keys
end

def dump(file, options=[])
end

def exec_script(path)
end

def q(query, options={})
forklift.logger.debug "\tSQL[#{config[:database]}]: #{query}"
client.exec(query)
end

private
def ensure_row_types(rows, table, database=current_database)
columns = columns(table, database)
rows.each do |row|
row.each do |column, value|
unless columns.include?(column)
q(%{ALTER TABLE #{quote_ident(table)} ADD #{quote_ident(column)} #{sql_type(value)} NULL DEFAULT NULL})
columns = columns(table, database)
end
end
end
end

def paginate_query(query, page, page_size)
offset = (page-1) * page_size
[query, "LIMIT #{page_size} OFFSET #{offset}"].join(' ')
end

def quote_ident(table)
PG::Connection.quote_ident(table)
end
end
end
end
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_destination.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_destination
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_source_a.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_source_a
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_source_b.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_source_b
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_working.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_working
:host: 127.0.0.1
:port: 5432
58 changes: 35 additions & 23 deletions spec/integration/multi_transport_spec.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
require 'spec_helper'

describe 'multiple trasport types' do
describe 'elasticsearch => mysql' do
before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

describe 'elasticsearch => mysql' do
it 'can load in a full query' do
table = 'es_import'
index = 'forklift_test'
query = { query: { match_all: {} } }
plan = SpecPlan.new
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data| destination.write(data, table) }
}
plan.disconnect!

destination = SpecClient.mysql('forklift_test_destination')
rows = destination.query("select count(1) as 'count' from es_import").first["count"]
expect(rows).to eql 5
begin
plan = SpecPlan.new
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data| destination.write(data, table) }
expect(destination.count('es_import')).to eql(5)
}
ensure
plan.disconnect!
end
end

it 'can load in a partial query' do
Expand Down Expand Up @@ -54,13 +53,13 @@
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data|
source.read(index, query) {|data|
clean_data = []
data.each do |row|
row[:viewed_at] = Time.at(row[:viewed_at])
clean_data << row
end
destination.write(clean_data, table)
destination.write(clean_data, table)
}
}
plan.disconnect!
Expand All @@ -72,7 +71,11 @@

end

describe 'mysql => elasticsearch' do
describe 'mysql => elasticsearch' do
before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

after(:each) do
es = SpecClient.elasticsearch('forklift_test')
Expand All @@ -94,15 +97,15 @@
count = destination.count({ index: index })["count"]
expect(count).to eql 5
end

it 'can load in only some rows' do
table = 'users'
index = 'users'
plan = SpecPlan.new
plan.do! {
source = plan.connections[:mysql][:forklift_test_source_a]
destination = plan.connections[:elasticsearch][:forklift_test]
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
destination.write(data, index)
}
}
Expand All @@ -114,4 +117,13 @@
end
end

end
describe 'postgres => mysql' do
before do
SpecSeeds.setup_mysql
SpecSeeds.setup_postgres
end

it 'can load in a full table'
it 'can load in only some rows'
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require 'forklift'
require 'rspec'
require 'fileutils'
require 'pry-byebug'

ENV["FORKLIFT_RUN_ALL_STEPS"] = 'true'

Expand Down
55 changes: 55 additions & 0 deletions spec/support/dumps/pg/forklift_test_source_a.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS "products";

CREATE TABLE "products" (
"id" SERIAL NOT NULL PRIMARY KEY,
"name" varchar(255) NOT NULL DEFAULT '',
"description" text NOT NULL,
"inventory" integer DEFAULT NULL,
"created_at" timestamp NOT NULL,
"updated_at" timestamp NOT NULL
);

INSERT INTO "products" ("id", "name", "description", "inventory", "created_at", "updated_at")
VALUES
(1,'car','a car',10,'2014-04-03 11:45:51','2014-04-03 11:45:51'),
(2,'boat','a boat',3,'2014-04-03 11:45:52','2014-04-03 11:45:52'),
(3,'bus','a bus',5,'2014-04-03 11:45:54','2014-04-03 11:45:54'),
(4,'motorcycle','a motorcycle',23,'2014-04-03 11:45:56','2014-04-03 11:45:56'),
(5,'hang_glider','awesome',2,'2014-04-03 11:46:19','2014-04-03 11:46:19');

DROP TABLE IF EXISTS "sales";

CREATE TABLE "sales" (
"id" SERIAL NOT NULL PRIMARY KEY,
"user_id" integer NOT NULL,
"product_id" integer NOT NULL,
"timestamp" timestamp NOT NULL
);

INSERT INTO "sales" ("id", "user_id", "product_id", "timestamp")
VALUES
(1,1,1,'2014-04-03 11:47:11'),
(2,1,2,'2014-04-03 11:47:11'),
(3,4,5,'2014-04-03 11:47:12'),
(4,4,4,'2014-04-03 11:47:25'),
(5,5,5,'2014-04-03 11:47:26');

DROP TABLE IF EXISTS "users";

CREATE TABLE "users" (
"id" SERIAL NOT NULL PRIMARY KEY,
"email" varchar(255) NOT NULL DEFAULT '',
"first_name" varchar(255) NOT NULL DEFAULT '',
"last_name" varchar(255) NOT NULL DEFAULT '',
"created_at" timestamp NOT NULL,
"updated_at" timestamp NOT NULL
);

INSERT INTO "users" ("id", "email", "first_name", "last_name", "created_at", "updated_at")
VALUES
(1,'[email protected]','Evan','T','2014-04-03 11:40:12','2014-04-03 11:39:28'),
(2,'[email protected]','Pablo ','J','2014-04-03 11:41:08','2014-04-03 11:41:08'),
(3,'[email protected]','Kevin','B','2014-04-03 11:41:10','2014-04-03 11:41:10'),
(4,'[email protected]','Brian','L','2014-04-03 11:41:12','2014-04-03 11:41:12'),
(5,'[email protected]','Aaron','B','2014-04-03 11:41:13','2014-04-03 11:41:13');
16 changes: 16 additions & 0 deletions spec/support/dumps/pg/forklift_test_source_b.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS "admin_notes";

CREATE TABLE "admin_notes" (
"id" SERIAL NOT NULL PRIMARY KEY,
"user_id" integer NOT NULL,
"note" text NOT NULL,
"created_at" timestamp NOT NULL,
"updated_at" timestamp NOT NULL
);

INSERT INTO "admin_notes" ("id", "user_id", "note", "created_at", "updated_at")
VALUES
(1,1,'User 1 called customer support\n','2014-04-03 11:50:25','2014-04-03 11:50:25'),
(2,2,'User 2 called customer support','2014-04-03 11:50:26','2014-04-03 11:50:26'),
(3,5,'User 5 returned the purchase','2014-04-03 11:50:28','2014-04-03 11:50:28');
Loading

0 comments on commit c64f1bb

Please sign in to comment.