-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
232 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
#!/usr/bin/env python | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2019 ACSONE SA/NV (<http://acsone.eu>) | ||
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html). | ||
|
||
import os | ||
import shutil | ||
|
||
import click | ||
import click_odoo | ||
import psycopg2 | ||
from click_odoo import OdooEnvironment, odoo | ||
|
||
from ._dbutils import db_exists, db_management_enabled, reset_config_parameters | ||
from .backupdb import DBDUMP_FILENAME, FILESTORE_DIRAME, MANIFEST_FILENAME | ||
|
||
|
||
def _restore_from_folder(dbname, backup, copy=True, jobs=1): | ||
manifest_file_path = os.path.join(backup, MANIFEST_FILENAME) | ||
dbdump_file_path = os.path.join(backup, DBDUMP_FILENAME) | ||
filestore_dir_path = os.path.join(backup, FILESTORE_DIRAME) | ||
if not os.path.exists(manifest_file_path) or not os.path.exists(dbdump_file_path): | ||
msg = ( | ||
"{} is not folder backup created by the backupdb command. " | ||
"{} and {} files are missing.".format( | ||
backup, MANIFEST_FILENAME, DBDUMP_FILENAME | ||
) | ||
) | ||
raise click.ClickException(msg) | ||
|
||
odoo.service.db._create_empty_database(dbname) | ||
pg_args = ["--jobs", str(jobs), "--dbname", dbname, "--no-owner", dbdump_file_path] | ||
if odoo.tools.exec_pg_command("pg_restore", *pg_args): | ||
raise click.ClickException("Couldn't restore database") | ||
if copy: | ||
# if it's a copy of a database, force generation of a new dbuuid | ||
reset_config_parameters(dbname) | ||
with OdooEnvironment(dbname) as env: | ||
if os.path.exists(filestore_dir_path): | ||
filestore_dest = env["ir.attachment"]._filestore() | ||
shutil.move(filestore_dir_path, filestore_dest) | ||
|
||
if odoo.tools.config["unaccent"]: | ||
try: | ||
with env.cr.savepoint(): | ||
env.cr.execute("CREATE EXTENSION unaccent") | ||
except psycopg2.Error: | ||
pass | ||
odoo.sql_db.close_db(dbname) | ||
|
||
|
||
def _restore_from_file(dbname, backup, copy=True): | ||
with db_management_enabled(), open(backup, "rb") as backup_file: | ||
odoo.service.db.restore_db(dbname, backup_file, copy) | ||
odoo.sql_db.close_db(dbname) | ||
|
||
|
||
@click.command() | ||
@click_odoo.env_options( | ||
default_log_level="warn", with_database=False, with_rollback=False | ||
) | ||
@click.option( | ||
"--copy/--move", | ||
default=True, | ||
help="This database is a copy.\nIn order " | ||
"to avoid conflicts between databases, Odoo needs to know if this" | ||
"database was moved or copied. If you don't know, set is a copy.", | ||
) | ||
@click.option( | ||
"--force", | ||
is_flag=True, | ||
show_default=True, | ||
help="Don't report error if destination database already exists. If " | ||
"force and destination database exists, it will be dropped before " | ||
"restore.", | ||
) | ||
@click.option( | ||
"--jobs", | ||
help="Uses this many parallel jobs to restore. (Only used to restore" | ||
"folder backup)", | ||
type=int, | ||
default=1, | ||
) | ||
@click.argument("dbname", nargs=1) | ||
@click.argument( | ||
"backup", | ||
nargs=1, | ||
type=click.Path( | ||
exists=True, file_okay=True, dir_okay=True, readable=True, resolve_path=True | ||
), | ||
) | ||
def main(env, dbname, backup, copy, force, jobs): | ||
""" Restore an Odoo database backup. | ||
This script allows you to restore databses created by using the Odoo | ||
web interface or the backupdb script. This | ||
avoids timeout and file size limitation problems when | ||
databases are too large. | ||
""" | ||
if db_exists(dbname): | ||
msg = "Destination database already exists: {}".format(dbname) | ||
if not force: | ||
raise click.ClickException(msg) | ||
msg = "{} -> drop".format(msg) | ||
click.echo(click.style(msg, fg="yellow")) | ||
with db_management_enabled(): | ||
odoo.service.db.exp_drop(dbname) | ||
if os.path.isfile(backup): | ||
_restore_from_file(dbname, backup, copy) | ||
else: | ||
_restore_from_folder(dbname, backup, copy, jobs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# Copyright 2018 ACSONE SA/NV (<http://acsone.eu>) | ||
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html). | ||
|
||
import operator | ||
import os | ||
import shutil | ||
import subprocess | ||
from collections import defaultdict | ||
|
||
import click_odoo | ||
import pytest | ||
from click.testing import CliRunner | ||
from click_odoo import odoo | ||
|
||
from click_odoo_contrib._dbutils import db_exists | ||
from click_odoo_contrib.backupdb import main as backupdb | ||
from click_odoo_contrib.restoredb import main as restoredb | ||
|
||
TEST_DBNAME = "click-odoo-contrib-testrestoredb" | ||
|
||
_DEFAULT_IR_CONFIG_PARAMETERS = ["database.uuid", "database.create_date"] | ||
|
||
|
||
def _createdb(dbname): | ||
subprocess.check_call(["createdb", dbname]) | ||
|
||
|
||
def _dropdb(dbname): | ||
subprocess.check_call(["dropdb", "--if-exists", dbname]) | ||
|
||
|
||
def _dropdb_odoo(dbname): | ||
_dropdb(dbname) | ||
filestore_dir = odoo.tools.config.filestore(dbname) | ||
if os.path.isdir(filestore_dir): | ||
shutil.rmtree(filestore_dir) | ||
|
||
|
||
def _check_default_params(db1, db2, operator): | ||
params_by_db = defaultdict(dict) | ||
for db in (db1, db2): | ||
with click_odoo.OdooEnvironment(database=db) as env: | ||
IrConfigParameters = env["ir.config_parameter"] | ||
for key in _DEFAULT_IR_CONFIG_PARAMETERS: | ||
params_by_db[db][key] = IrConfigParameters.get_param(key) | ||
params1 = params_by_db[db1] | ||
params2 = params_by_db[db2] | ||
assert set(params1.keys()) == set(params2.keys()) | ||
for k, v in params1.items(): | ||
assert operator(v, params2[k]) | ||
|
||
|
||
@pytest.fixture(params=["folder", "zip"]) | ||
def backup(request, odoodb, odoocfg, tmp_path): | ||
if request.param == "folder": | ||
name = "backup" | ||
else: | ||
name = "backup.zip" | ||
path = tmp_path.joinpath(name) | ||
posix_path = path.as_posix() | ||
CliRunner().invoke( | ||
backupdb, ["--format={}".format(request.param), odoodb, posix_path] | ||
) | ||
yield posix_path, odoodb | ||
|
||
|
||
def test_db_restore(backup): | ||
assert not db_exists(TEST_DBNAME) | ||
backup_path, original_db = backup | ||
try: | ||
result = CliRunner().invoke(restoredb, [TEST_DBNAME, backup_path]) | ||
assert result.exit_code == 0 | ||
assert db_exists(TEST_DBNAME) | ||
# default restore mode is copy -> default params are not preserved | ||
_check_default_params(TEST_DBNAME, original_db, operator.ne) | ||
finally: | ||
_dropdb_odoo(TEST_DBNAME) | ||
|
||
|
||
def test_db_restore_target_exists(backup): | ||
_createdb(TEST_DBNAME) | ||
backup_path, original_db = backup | ||
try: | ||
result = CliRunner().invoke(restoredb, [TEST_DBNAME, backup_path]) | ||
assert result.exit_code != 0, result.output | ||
assert "Destination database already exists" in result.output | ||
finally: | ||
_dropdb_odoo(TEST_DBNAME) | ||
try: | ||
result = CliRunner().invoke(restoredb, ["--force", TEST_DBNAME, backup_path]) | ||
assert result.exit_code == 0 | ||
assert db_exists(TEST_DBNAME) | ||
finally: | ||
_dropdb_odoo(TEST_DBNAME) | ||
|
||
|
||
def test_db_restore_move(backup): | ||
assert not db_exists(TEST_DBNAME) | ||
backup_path, original_db = backup | ||
try: | ||
result = CliRunner().invoke(restoredb, ["--move", TEST_DBNAME, backup_path]) | ||
assert result.exit_code == 0 | ||
assert db_exists(TEST_DBNAME) | ||
# when database is moved, default params are preserved | ||
_check_default_params(TEST_DBNAME, original_db, operator.eq) | ||
finally: | ||
_dropdb_odoo(TEST_DBNAME) |