From 4802b47f56b5dd02d3c6162fb2a1951eccc24fc4 Mon Sep 17 00:00:00 2001 From: Stephen McDonald Date: Fri, 11 Apr 2014 12:36:44 +1000 Subject: [PATCH 1/2] Added num_procs option to config file, allowing for parallel processing when writing directly to PG. --- mysql2pgsql/lib/config.py | 9 ++++-- mysql2pgsql/lib/converter.py | 63 ++++++++++++++++++++++++++++++++---- mysql2pgsql/mysql2pgsql.py | 13 +++++--- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/mysql2pgsql/lib/config.py b/mysql2pgsql/lib/config.py index b7cb281..3aec964 100644 --- a/mysql2pgsql/lib/config.py +++ b/mysql2pgsql/lib/config.py @@ -44,17 +44,17 @@ def reset_configfile(self, file_path): port: 3306 socket: /tmp/mysql.sock username: mysql2psql - password: + password: database: mysql2psql_test compress: false destination: # if file is given, output goes to file, else postgres - file: + file: postgres: hostname: localhost port: 5432 username: mysql2psql - password: + password: database: mysql2psql_test # if tables is given, only the listed tables will be converted. leave empty to convert all tables. @@ -77,4 +77,7 @@ def reset_configfile(self, file_path): # if timezone is true, forces to append/convert to UTC tzinfo mysql data timezone: false + +# number of processes to use when writing directly to postgresql. +num_procs: 1 """ diff --git a/mysql2pgsql/lib/converter.py b/mysql2pgsql/lib/converter.py index 02d5d41..7311de9 100644 --- a/mysql2pgsql/lib/converter.py +++ b/mysql2pgsql/lib/converter.py @@ -1,20 +1,39 @@ from __future__ import absolute_import +from multiprocessing import Process +from threading import Thread +from Queue import Queue + from . import print_start_table class Converter(object): - def __init__(self, reader, writer, file_options, verbose=False): + def __init__(self, reader_class, reader_args, writer_class, writer_args, + file_options, num_procs=1, verbose=False): + # We store the read/writer classes and args so that we can create + # new instances for multiprocessing, via the get_reader and + # get_writer methods. self.verbose = verbose - self.reader = reader - self.writer = writer + self.reader_class = reader_class + self.reader_args = reader_args + self.reader = self.get_reader() + self.writer_class = writer_class + self.writer_args = writer_args + self.writer = self.get_writer() self.file_options = file_options + self.num_procs = num_procs self.exclude_tables = file_options.get('exclude_tables', []) self.only_tables = file_options.get('only_tables', []) self.supress_ddl = file_options.get('supress_ddl', None) self.supress_data = file_options.get('supress_data', None) self.force_truncate = file_options.get('force_truncate', None) + def get_reader(self): + return self.reader_class(*self.reader_args) + + def get_writer(self): + return self.writer_class(*self.writer_args) + def convert(self): if self.verbose: print_start_table('>>>>>>>>>> STARTING <<<<<<<<<<\n\n') @@ -22,7 +41,7 @@ def convert(self): tables = [t for t in (t for t in self.reader.tables if t.name not in self.exclude_tables) if not self.only_tables or t.name in self.only_tables] if self.only_tables: tables.sort(key=lambda t: self.only_tables.index(t.name)) - + if not self.supress_ddl: if self.verbose: print_start_table('START CREATING TABLES') @@ -47,8 +66,40 @@ def convert(self): if self.verbose: print_start_table('START WRITING TABLE DATA') - for table in tables: - self.writer.write_contents(table, self.reader) + if self.num_procs == 1: + # No parallel processing - process tables sequentially. + for table in tables: + self.writer.write_contents(table, self.reader) + else: + # Parallel processing. Work is CPU bound so we need to + # use multiprocessing, however the MySQL table objects + # can't be pickled, so we're unable to easily build a + # worker pool using multiprocessing, so we use threads + # to manager the worker pool, with each worker thread + # creating a new process for each table transferred. + queue = Queue() + for table in tables: + queue.put(table) + for _ in range(self.num_procs): + queue.put(None) + + def worker(): + writer = self.get_writer() + reader = self.get_reader() + while True: + table = queue.get() + if table is None: + return + proc = Process(target=writer.write_contents, args=(table, reader)) + proc.start() + proc.join() + + threads = [] + for _ in range(self.num_procs): + threads.append(Thread(target=worker)) + threads[-1].start() + for thread in threads: + thread.join() if self.verbose: print_start_table('DONE WRITING TABLE DATA') diff --git a/mysql2pgsql/mysql2pgsql.py b/mysql2pgsql/mysql2pgsql.py index 3c804b8..3f3a7cd 100644 --- a/mysql2pgsql/mysql2pgsql.py +++ b/mysql2pgsql/mysql2pgsql.py @@ -21,14 +21,19 @@ def __init__(self, options): raise e def convert(self): - reader = MysqlReader(self.file_options['mysql']) + reader_class = MysqlReader + reader_args = (self.file_options['mysql'],) + num_procs = 1 if self.file_options['destination']['file']: - writer = PostgresFileWriter(self._get_file(self.file_options['destination']['file']), self.run_options.verbose, tz=self.file_options.get('timezone', False)) + writer_class = PostgresFileWriter + writer_args = (self._get_file(self.file_options['destination']['file']), self.run_options.verbose, self.file_options.get('timezone', False)) else: - writer = PostgresDbWriter(self.file_options['destination']['postgres'], self.run_options.verbose, tz=self.file_options.get('timezone', False)) + writer_class = PostgresDbWriter + writer_args = (self.file_options['destination']['postgres'], self.run_options.verbose, self.file_options.get('timezone', False)) + num_procs = self.file_options.get('num_procs', num_procs) - Converter(reader, writer, self.file_options, self.run_options.verbose).convert() + Converter(reader_class, reader_args, writer_class, writer_args, self.file_options, num_procs, self.run_options.verbose).convert() def _get_file(self, file_path): return codecs.open(file_path, 'wb', 'utf-8') From 8cadfe1265a4c84faa86d2a1169c6187c3ea5ecf Mon Sep 17 00:00:00 2001 From: Stephen McDonald Date: Mon, 14 Apr 2014 15:43:56 +1000 Subject: [PATCH 2/2] Need reader/writer per proc to avoid shared pg connection issues. --- mysql2pgsql/lib/converter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mysql2pgsql/lib/converter.py b/mysql2pgsql/lib/converter.py index 7311de9..9134a28 100644 --- a/mysql2pgsql/lib/converter.py +++ b/mysql2pgsql/lib/converter.py @@ -84,9 +84,9 @@ def convert(self): queue.put(None) def worker(): - writer = self.get_writer() - reader = self.get_reader() while True: + writer = self.get_writer() + reader = self.get_reader() table = queue.get() if table is None: return