Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce postgres transport. This is pretty much a read only version so far. #46

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions forklift_etl.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rake'
s.add_development_dependency 'rspec'
s.add_development_dependency 'email_spec'
s.add_development_dependency 'pg'
end
211 changes: 211 additions & 0 deletions lib/forklift/transports/pg.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
require 'delegate'

module Forklift
module Connection
class Pg < Forklift::Base::Connection
def initialize(config, forklift)
begin
require 'pg' unless defined?(PG)
rescue LoadError
raise "To use the postgres connection you must add 'pg' to your Gemfile"
end
super(config, forklift)
end

def connect
@client ||= PG::Connection.new(config)
end

def disconnect
client.close
end

def default_matcher
'updated_at'
end

def drop!(table)
q("DROP TABLE IF EXISTS #{quote_ident(table)}")
end

def rename(table, new_table)
q("ALTER TABLE #{quote_ident(table)} RENAME TO #{quote_ident(new_table)}")
end

def read(query, database=current_database, looping=true, limit=forklift.config[:batch_size], offset=0)
page = 1
loop do
result = q(paginate_query(query, page, limit))

block_given? ? yield(result) : (return result)
return result if result.num_tuples < limit || !looping
page += 1
end
end

def write(rows, table, to_update=true, database=current_database, primary_key=:id, lazy=true, crash_on_extral_col=false)
if tables.include? table
ensure_row_types(rows, table, database)
elsif lazy && rows.length > 0
lazy_table_create(table, rows, database, primary_key)
end

insert_values = []
delete_keys = []
rows.map do |row|
if to_update && !row[primary_key].nil?
delete_keys << row[primary_key]
else
insert_values << safe_values(columns, row)
end
end

unless delete_keys.empty?
q(%{DELETE FROM #{quote_ident(table)} WHERE #{quote_ident(primary_key)} IN (#{delete_keys.join(',')})})
end

q(%{INSERT INTO #{quote_ident(table)} (#{safe_columns(columns)}) VALUES #{insert_values.join(',')}})
forklift.logger.log "wrote #{rows.length} rows to `#{database}`.`#{table}`"
end

# @todo
def lazy_table_create(table, data, database=current_database, primary_key=:id, matcher=default_matcher)
raise NotImplementedError.new
end

# @todo
def sql_type(v)
raise NotImplementedError.new
end

def read_since(table, since, matcher=default_matcher, database=current_database, limit=forklift.config[:batch_size])
query = %{SELECT * FROM #{quote_ident(table)} WHERE #{quote_ident(matcher)} >= #{client.escape_literal(since)} ORDER BY #{quote_ident(matcher)} ASC}
self.read(query, database, true, limit) do |rows|
if block_given?
yield rows
else
return rows
end
end
end

def max_timestamp(table, matcher=default_matcher)
row = q(%{SELECT max(#{quote_ident(matcher)}) AS 'matcher' FROM #{quote_ident(table)}}).first
(row && row['matcher']) || Time.at(0)
end

def tables
table_list = []
read(%{SELECT table_name AS "table_name" FROM information_schema.tables WHERE table_schema = 'public'}) do |result|
table_list << result.map{|r| r['table_name']}
end
table_list.flatten.compact
end

def current_database
client.db
end

def count(table)
q(%{SELECT count(1) AS "count" FROM #{quote_ident(table)}})[0][:count].to_i
end

def truncate!(table)
q("TRUNCATE TABLE #{quote_ident(table)}")
end

def truncate(table)
begin
self.truncate!(table)
rescue Exception => e
forklift.logger.debug e
end
end

def columns(table, database=current_database, return_types=false)
columns = {}
read(%{SELECT column_name, data_type, character_maximum_length FROM "information_schema"."columns" WHERE table_name='#{table}'}) do |rows|
rows.each do |row|
type = case row[:data_type]
when 'character varying' then "varchar(#{row[:character_maximum_length]})"
else row[:data_type]
end
columns[row[:column_name].to_sym] = type
end
end
return_types ? columns : columns.keys
end

def dump(file, options=[])
dburl = URI::Generic.new('postgresql', "#{client.user}:#{config[:password]}", (client.host || 'localhost'), client.port, nil, "/#{client.db}", nil, nil, nil)
cmd = %{pg_dump --dbname #{dburl.to_s} -Fp #{options.join(' ')} | gzip > #{file}}
forklift.logger.log "Dumping #{client.db} to #{file}"
forklift.logger.debug cmd
Open3.popen3(cmd) do |stdin, stdout, stderr|
stdout = stdout.readlines
stderr = stderr.readlines
if stderr.length > 0
raise " > Dump error: #{stderr.join(" ")}"
else
forklift.logger.log " > Dump complete"
end
end
end

def exec_script(path)
end

def q(query, options={})
forklift.logger.debug "\tSQL[#{config[:database]}]: #{query}"
Result.new(client.exec(query))
end

class Result < SimpleDelegator
def initialize(pg_result)
@pg_result = pg_result
super(pg_result)
end

def [](idx)
symbolize_row(@pg_result[idx])
end

def each
@pg_result.each do |row|
yield symbolize_row(row)
end
end

private
def symbolize_row(row)
row.inject({}) do |memo, (k,v)|
memo[k.to_sym] = v
memo
end
end
end

private
def ensure_row_types(rows, table, database=current_database)
columns = columns(table, database)
rows.each do |row|
row.each do |column, value|
unless columns.include?(column)
q(%{ALTER TABLE #{quote_ident(table)} ADD #{quote_ident(column)} #{sql_type(value)} NULL DEFAULT NULL})
columns = columns(table, database)
end
end
end
end

def paginate_query(query, page, page_size)
offset = (page-1) * page_size
[query, "LIMIT #{page_size} OFFSET #{offset}"].join(' ')
end

def quote_ident(table)
PG::Connection.quote_ident(table)
end
end
end
end
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_destination.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_destination
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_source_a.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_source_a
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_source_b.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_source_b
:host: 127.0.0.1
:port: 5432
3 changes: 3 additions & 0 deletions spec/config/connections/pg/forklift_test_working.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:dbname: forklift_test_working
:host: 127.0.0.1
:port: 5432
58 changes: 35 additions & 23 deletions spec/integration/multi_transport_spec.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
require 'spec_helper'

describe 'multiple trasport types' do
describe 'elasticsearch => mysql' do
before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

describe 'elasticsearch => mysql' do
it 'can load in a full query' do
table = 'es_import'
index = 'forklift_test'
query = { query: { match_all: {} } }
plan = SpecPlan.new
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data| destination.write(data, table) }
}
plan.disconnect!

destination = SpecClient.mysql('forklift_test_destination')
rows = destination.query("select count(1) as 'count' from es_import").first["count"]
expect(rows).to eql 5
begin
plan = SpecPlan.new
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data| destination.write(data, table) }
expect(destination.count('es_import')).to eql(5)
}
ensure
plan.disconnect!
end
end

it 'can load in a partial query' do
Expand Down Expand Up @@ -54,13 +53,13 @@
plan.do! {
source = plan.connections[:elasticsearch][:forklift_test]
destination = plan.connections[:mysql][:forklift_test_destination]
source.read(index, query) {|data|
source.read(index, query) {|data|
clean_data = []
data.each do |row|
row[:viewed_at] = Time.at(row[:viewed_at])
clean_data << row
end
destination.write(clean_data, table)
destination.write(clean_data, table)
}
}
plan.disconnect!
Expand All @@ -72,7 +71,11 @@

end

describe 'mysql => elasticsearch' do
describe 'mysql => elasticsearch' do
before(:each) do
SpecSeeds.setup_mysql
SpecSeeds.setup_elasticsearch
end

after(:each) do
es = SpecClient.elasticsearch('forklift_test')
Expand All @@ -94,15 +97,15 @@
count = destination.count({ index: index })["count"]
expect(count).to eql 5
end

it 'can load in only some rows' do
table = 'users'
index = 'users'
plan = SpecPlan.new
plan.do! {
source = plan.connections[:mysql][:forklift_test_source_a]
destination = plan.connections[:elasticsearch][:forklift_test]
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
source.read("select * from #{table}", source.current_database, false, 3, 0) {|data|
destination.write(data, index)
}
}
Expand All @@ -114,4 +117,13 @@
end
end

end
describe 'postgres => mysql' do
before do
SpecSeeds.setup_mysql
SpecSeeds.setup_postgres
end

it 'can load in a full table'
it 'can load in only some rows'
end
end
55 changes: 55 additions & 0 deletions spec/support/dumps/pg/forklift_test_source_a.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS "products";

CREATE TABLE "products" (
"id" SERIAL NOT NULL PRIMARY KEY,
"name" varchar(255) NOT NULL DEFAULT '',
"description" text NOT NULL,
"inventory" integer DEFAULT NULL,
"created_at" timestamp NOT NULL,
"updated_at" timestamp NOT NULL
);

INSERT INTO "products" ("id", "name", "description", "inventory", "created_at", "updated_at")
VALUES
(1,'car','a car',10,'2014-04-03 11:45:51','2014-04-03 11:45:51'),
(2,'boat','a boat',3,'2014-04-03 11:45:52','2014-04-03 11:45:52'),
(3,'bus','a bus',5,'2014-04-03 11:45:54','2014-04-03 11:45:54'),
(4,'motorcycle','a motorcycle',23,'2014-04-03 11:45:56','2014-04-03 11:45:56'),
(5,'hang_glider','awesome',2,'2014-04-03 11:46:19','2014-04-03 11:46:19');

DROP TABLE IF EXISTS "sales";

CREATE TABLE "sales" (
"id" SERIAL NOT NULL PRIMARY KEY,
"user_id" integer NOT NULL,
"product_id" integer NOT NULL,
"timestamp" timestamp NOT NULL
);

INSERT INTO "sales" ("id", "user_id", "product_id", "timestamp")
VALUES
(1,1,1,'2014-04-03 11:47:11'),
(2,1,2,'2014-04-03 11:47:11'),
(3,4,5,'2014-04-03 11:47:12'),
(4,4,4,'2014-04-03 11:47:25'),
(5,5,5,'2014-04-03 11:47:26');

DROP TABLE IF EXISTS "users";

CREATE TABLE "users" (
"id" SERIAL NOT NULL PRIMARY KEY,
"email" varchar(255) NOT NULL DEFAULT '',
"first_name" varchar(255) NOT NULL DEFAULT '',
"last_name" varchar(255) NOT NULL DEFAULT '',
"created_at" timestamp NOT NULL,
"updated_at" timestamp NOT NULL
);

INSERT INTO "users" ("id", "email", "first_name", "last_name", "created_at", "updated_at")
VALUES
(1,'[email protected]','Evan','T','2014-04-03 11:40:12','2014-04-03 11:39:28'),
(2,'[email protected]','Pablo ','J','2014-04-03 11:41:08','2014-04-03 11:41:08'),
(3,'[email protected]','Kevin','B','2014-04-03 11:41:10','2014-04-03 11:41:10'),
(4,'[email protected]','Brian','L','2014-04-03 11:41:12','2014-04-03 11:41:12'),
(5,'[email protected]','Aaron','B','2014-04-03 11:41:13','2014-04-03 11:41:13');
Loading