diff --git a/.oca/oca-port/blacklist/spec_driven_model.json b/.oca/oca-port/blacklist/spec_driven_model.json new file mode 100644 index 000000000000..ebb5ebf7873a --- /dev/null +++ b/.oca/oca-port/blacklist/spec_driven_model.json @@ -0,0 +1,6 @@ +{ + "pull_requests": { + "OCA/l10n-brazil#3335": "done in #3442", + "OCA/l10n-brazil#3345": "dotfiles not ported to 15.0 yet" + } +} diff --git a/l10n_br_account_nfe/tests/test_nfce_contingency.py.orig b/l10n_br_account_nfe/tests/test_nfce_contingency.py.orig new file mode 100644 index 000000000000..1710e38f636c --- /dev/null +++ b/l10n_br_account_nfe/tests/test_nfce_contingency.py.orig @@ -0,0 +1,84 @@ +# Copyright 2023 KMEE (Felipe Zago Rodrigues ) +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). + +from odoo.tests import TransactionCase + + +class TestAccountNFCeContingency(TransactionCase): +<<<<<<< HEAD + @classmethod + def setUpClass(cls): + super().setUpClass() + # this hook is required to test l10n_br_account_nfe alone: + cls.env["spec.mixin.nfe"]._register_hook() + cls.document_id = cls.env.ref("l10n_br_nfe.demo_nfce_same_state") + cls.prepare_account_move_nfce() +||||||| constructed merge base + def setUp(self): + super().setUp() + # this hook is required to test l10n_br_account_nfe alone: + self.env["spec.mixin.nfe"]._register_hook() + self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state") + self.prepare_account_move_nfce() +======= + def setUp(self): + super().setUp() + self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state") + self.prepare_account_move_nfce() +>>>>>>> [REM] l10n_br_account_nfe: drop _register_hook + + @classmethod + def prepare_account_move_nfce(cls): + receivable_account_id = cls.env["account.account"].create( + { + "name": "TEST ACCOUNT", + "code": "01.1.1.2.2", + "reconcile": 1, + "company_id": cls.env.ref("base.main_company").id, + "user_type_id": cls.env.ref("account.data_account_type_receivable").id, + } + ) + payable_account_id = cls.env["account.account"].create( + { + "name": "TEST ACCOUNT 2", + "code": "01.1.1.2.3", + "reconcile": 1, + "company_id": cls.env.ref("base.main_company").id, + "user_type_id": cls.env.ref("account.data_account_type_payable").id, + } + ) + payment_method = cls.env.ref("account.account_payment_method_manual_in").id + journal_id = cls.env["account.journal"].create( + { + "name": "JOURNAL TEST", + "code": "TEST", + "type": "bank", + "company_id": cls.env.ref("base.main_company").id, + } + ) + payment_mode = cls.env["account.payment.mode"].create( + { + "name": "PAYMENT MODE TEST", + "company_id": cls.env.ref("base.main_company").id, + "payment_method_id": payment_method, + "fiscal_payment_mode": "15", + "bank_account_link": "fixed", + "fixed_journal_id": journal_id.id, + } + ) + cls.document_move_id = cls.env["account.move"].create( + { + "name": "MOVE TEST", + "payment_mode_id": payment_mode.id, + "company_id": cls.env.ref("base.main_company").id, + "line_ids": [ + (0, 0, {"account_id": receivable_account_id.id, "credit": 10}), + (0, 0, {"account_id": payable_account_id.id, "debit": 10}), + ], + } + ) + cls.document_move_id.fiscal_document_id = cls.document_id.id + + def test_nfce_contingencia(self): + self.document_id._update_nfce_for_offline_contingency() + self.assertIn(self.document_move_id, self.document_id.move_ids) diff --git a/l10n_br_nfe/hooks.py.orig b/l10n_br_nfe/hooks.py.orig new file mode 100644 index 000000000000..847aef3a776c --- /dev/null +++ b/l10n_br_nfe/hooks.py.orig @@ -0,0 +1,54 @@ +# Copyright (C) 2019-2020 - Raphael Valyi Akretion +# License AGPL-3 - See http://www.gnu.org/licenses/agpl-3.0.html +import logging + +import nfelib +import pkg_resources +from nfelib.nfe.bindings.v4_0.leiaute_nfe_v4_00 import TnfeProc + +from odoo import SUPERUSER_ID, api +from odoo.exceptions import ValidationError + +from odoo.addons.spec_driven_model import hooks + +_logger = logging.getLogger(__name__) + + +def post_init_hook(cr, registry): + env = api.Environment(cr, SUPERUSER_ID, {}) +<<<<<<< HEAD + hooks.register_hook( + env, "l10n_br_nfe", "odoo.addons.l10n_br_nfe_spec.models.v4_0.leiaute_nfe_v4_00" + ) + +||||||| constructed merge base + env["nfe.40.infnfe"]._register_hook() +======= +>>>>>>> [REF] l10n_br_nfe: further multi-schemas + cr.execute("select demo from ir_module_module where name='l10n_br_nfe';") + is_demo = cr.fetchone()[0] + if is_demo: + res_items = ( + "nfe", + "samples", + "v4_0", + "leiauteNFe", + "35180834128745000152550010000474491454651420-nfe.xml", + ) + resource_path = "/".join(res_items) + nfe_stream = pkg_resources.resource_stream(nfelib.__name__, resource_path) + binding = TnfeProc.from_xml(nfe_stream.read().decode()) + document_number = binding.NFe.infNFe.ide.nNF + existing_nfes = env["l10n_br_fiscal.document"].search( + [("document_number", "=", document_number)] + ) + try: + existing_nfes.unlink() + nfe = ( + env["nfe.40.infnfe"] + .with_context(tracking_disable=True, edoc_type="in") + .build_from_binding("nfe", "40", binding.NFe.infNFe) + ) + _logger.info(nfe.nfe40_emit.nfe40_CNPJ) + except ValidationError: + _logger.info("NF-e already %s imported by hooks" % (document_number,)) diff --git a/spec_driven_model/models/spec_export.py.orig b/spec_driven_model/models/spec_export.py.orig new file mode 100644 index 000000000000..b62a2617a9df --- /dev/null +++ b/spec_driven_model/models/spec_export.py.orig @@ -0,0 +1,232 @@ +# Copyright 2019 KMEE +# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html). + +import logging +import sys + +from odoo import api, fields, models + +_logger = logging.getLogger(__name__) + + +class SpecMixinExport(models.AbstractModel): + _name = "spec.mixin_export" + _description = "a mixin providing serialization features" + + @api.model + def _get_binding_class(self, class_obj): + binding_module = sys.modules[self._get_spec_property("binding_module")] + for attr in class_obj._binding_type.split("."): + binding_module = getattr(binding_module, attr) + return binding_module + + @api.model + def _get_model_classes(self): + classes = [getattr(x, "_name", None) for x in type(self).mro()] + return classes + + @api.model + def _get_spec_classes(self, classes=False): + if not classes: + classes = self._get_model_classes() + spec_classes = [] + for c in set(classes): + if c is None: + continue +<<<<<<< HEAD + if not c.startswith("%s." % (self._schema_name,)): +||||||| constructed merge base + if not c.startswith(f"{self._schema_name}."): +======= + if not c.startswith(f"{self._context['spec_schema']}."): +>>>>>>> [REF] spec_driven_model: further multi-schemas + continue + # the following filter to fields to show + # when several XSD class are injected in the same object + if self._context.get("spec_class") and c != self._context["spec_class"]: + continue + spec_classes.append(c) + return spec_classes + + def _export_fields(self, xsd_fields, class_obj, export_dict): + """ + Iterate over the record fields and map them in an dict of values + that will later be injected as **kwargs in the proper XML Python + binding constructors. Hence the value can either be simple values or + sub binding instances already properly instanciated. + + This method implements a dynamic dispatch checking if there is any + method called _export_fields_CLASS_NAME to update the xsd_fields + and export_dict variables, this way we allow controlling the + flow of fields to export or injecting specific values in the + field export. + """ + self.ensure_one() + binding_class = self._get_binding_class(class_obj) + binding_class_spec = binding_class.__dataclass_fields__ + + class_name = class_obj._name.replace(".", "_") + export_method_name = "_export_fields_%s" % class_name + if hasattr(self, export_method_name): + xsd_fields = [i for i in xsd_fields] + export_method = getattr(self, export_method_name) + export_method(xsd_fields, class_obj, export_dict) + + for xsd_field in xsd_fields: + if not xsd_field: + continue + if ( + not self._fields.get(xsd_field) + ) and xsd_field not in self._get_stacking_points().keys(): + continue + field_spec_name = xsd_field.split("_")[1] # remove schema prefix + field_spec = False + for fname, fspec in binding_class_spec.items(): + if fspec.metadata.get("name", {}) == field_spec_name: + field_spec_name = fname + if field_spec_name == fname: + field_spec = fspec + if field_spec and not field_spec.init: + # case of xsd fixed values, we should not try to write them + continue + + if not binding_class_spec.get(field_spec_name): + # this can happen with a o2m generated foreign key for instance + continue + field_spec = binding_class_spec[field_spec_name] + field_data = self._export_field( + xsd_field, class_obj, field_spec, export_dict.get(field_spec_name) + ) + if xsd_field in self._get_stacking_points().keys(): + if not field_data: + # stacked nested tags are skipped if empty + continue + elif not self[xsd_field] and not field_data: + continue + + export_dict[field_spec_name] = field_data + + def _export_field(self, xsd_field, class_obj, field_spec, export_value=None): + """ + Map a single Odoo field to a python binding value according to the + kind of field. + """ + self.ensure_one() + # TODO: Export number required fields with Zero. + field = class_obj._fields.get( + xsd_field, self._get_stacking_points().get(xsd_field) + ) + xsd_required = field.xsd_required if hasattr(field, "xsd_required") else None + xsd_type = field.xsd_type if hasattr(field, "xsd_type") else None + if field.type == "many2one": + if (not self._get_stacking_points().get(xsd_field)) and ( + not self[xsd_field] and not xsd_required + ): + if field.comodel_name not in self._get_spec_classes(): + return False + if hasattr(field, "xsd_choice_required"): + xsd_required = True + return self._export_many2one(xsd_field, xsd_required, class_obj) + elif self._fields[xsd_field].type == "one2many": + return self._export_one2many(xsd_field, class_obj) + elif self._fields[xsd_field].type == "datetime" and self[xsd_field]: + return self._export_datetime(xsd_field) + elif self._fields[xsd_field].type == "date" and self[xsd_field]: + return self._export_date(xsd_field) + elif ( + self._fields[xsd_field].type in ("float", "monetary") + and self[xsd_field] is not False + ): + if hasattr(field, "xsd_choice_required"): + xsd_required = True + return self._export_float_monetary( + xsd_field, xsd_type, class_obj, xsd_required, export_value + ) + elif type(self[xsd_field]) is str: + return self[xsd_field].strip() + else: + return self[xsd_field] + + def _export_many2one(self, field_name, xsd_required, class_obj=None): + self.ensure_one() + if field_name in self._get_stacking_points().keys(): + return self._build_binding( + class_name=self._get_stacking_points()[field_name].comodel_name + ) + else: + return (self[field_name] or self)._build_binding( + class_name=class_obj._fields[field_name].comodel_name + ) + + def _export_one2many(self, field_name, class_obj=None): + self.ensure_one() + relational_data = [] + for relational_field in self[field_name]: + field_data = relational_field._build_binding( + class_name=class_obj._fields[field_name].comodel_name + ) + relational_data.append(field_data) + return relational_data + + def _export_float_monetary( + self, field_name, xsd_type, class_obj, xsd_required, export_value=None + ): + self.ensure_one() + field_data = export_value or self[field_name] + # TODO check xsd_required for all fields to export? + if not field_data and not xsd_required: + return False + if xsd_type and xsd_type.startswith("TDec"): + tdec = "".join(filter(lambda x: x.isdigit(), xsd_type))[-2:] + else: + tdec = "" + my_format = "%.{}f".format(tdec) + return str(my_format % field_data) + + def _export_date(self, field_name): + self.ensure_one() + return str(self[field_name]) + + def _export_datetime(self, field_name): + self.ensure_one() + return str( + fields.Datetime.context_timestamp( + self, fields.Datetime.from_string(self[field_name]) + ).isoformat("T") + ) + + def _build_binding(self, spec_schema=None, spec_version=None, class_name=None): + """ + Iterate over an Odoo record and its m2o and o2m sub-records + using a pre-order tree traversal and map the Odoo record values + to a dict of Python binding values. + + These values will later be injected as **kwargs in the proper XML Python + binding constructors. Hence the value can either be simple values or + sub binding instances already properly instanciated. + """ + self.ensure_one() + if spec_schema and spec_version: + self = self.with_context(spec_schema=spec_schema, spec_version=spec_version) + self.env[f"spec.mixin.{spec_schema}"]._register_hook() + if not class_name: + class_name = self._get_spec_property("stacking_mixin", self._name) + + class_obj = self.env[class_name] + + xsd_fields = ( + i + for i in class_obj._fields + if class_obj._fields[i].name.startswith(f"{self._spec_prefix()}_") + and "_choice" not in class_obj._fields[i].name + ) + + kwargs = {} + binding_class = self._get_binding_class(class_obj) + self._export_fields(xsd_fields, class_obj, export_dict=kwargs) + sliced_kwargs = { + key: kwargs.get(key) + for key in binding_class.__dataclass_fields__.keys() + if kwargs.get(key) + } + return binding_class(**sliced_kwargs) diff --git a/spec_driven_model/models/spec_import.py.orig b/spec_driven_model/models/spec_import.py.orig new file mode 100644 index 000000000000..e5878672069a --- /dev/null +++ b/spec_driven_model/models/spec_import.py.orig @@ -0,0 +1,364 @@ +# Copyright 2019-2020 Akretion - Raphael Valyi +# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html). + +import dataclasses +import inspect +import logging +import re +from datetime import datetime +from enum import Enum +from typing import ForwardRef + +from odoo import api, models + +_logger = logging.getLogger(__name__) + + +tz_datetime = re.compile(r".*[-+]0[0-9]:00$") + + +class SpecMixinImport(models.AbstractModel): + _name = "spec.mixin_import" + _description = """ + A recursive Odoo object builder that works along with the + xsdata object builder from the parsed XML. + Here we take into account the concrete Odoo objects where the schema + mixins where injected and possible matcher or builder overrides. + """ + + @api.model + def build_from_binding(self, spec_schema, spec_version, node, dry_run=False): + """ + Build an instance of an Odoo Model from a pre-populated + Python binding object. Binding object such as the ones generated using + xsdata can indeed be automatically populated from an XML file. + This build method bridges the gap to build the Odoo object. + + It uses a pre-order tree traversal of the Python bindings and for each + sub-binding (or node) it sees what is the corresponding Odoo model to map. + + Build can persist the object or just return a new instance + depending on the dry_run parameter. + + Defaults values and control options are meant to be passed in the context. + """ + model = self.with_context( + spec_schema=spec_schema, spec_version=spec_version + )._get_concrete_model(self._name) + attrs = model.with_context( + dry_run=dry_run, spec_schema=spec_schema, spec_version=spec_version + ).build_attrs(node) + if dry_run: + return model.new(attrs) + else: + return model.create(attrs) + + @api.model + def build_attrs(self, node, path="", defaults_model=None): + """ + Build a new odoo model instance from a Python binding element or + sub-element. Iterates over the binding fields to populate the Odoo fields. + """ + vals = {} + for fname, fspec in node.__dataclass_fields__.items(): + self._build_attr(node, self._fields, vals, path, (fname, fspec)) + vals = self._prepare_import_dict(vals, defaults_model=defaults_model) + return vals + + @api.model + def _build_attr(self, node, fields, vals, path, attr): + """ + Build an Odoo field from a binding attribute. + """ + value = getattr(node, attr[0]) + if value is None or value == []: + return False + prefix = f"{self._spec_prefix(self._context)}" +<<<<<<< HEAD + key = "%s%s" % ( + self._field_prefix, + attr[1].metadata.get("name", attr[0]), + ) +||||||| constructed merge base + key = "{}{}".format( + self._field_prefix, + attr[1].metadata.get("name", attr[0]), + ) +======= + key = f"{prefix}_{attr[1].metadata.get('name', attr[0])}" +>>>>>>> [REF] spec_driven_model: multi-schemas support + child_path = "%s.%s" % (path, key) + + # Is attr a xsd SimpleType or a ComplexType? + # with xsdata a ComplexType can have a type like: + # typing.Union[nfelib.nfe.bindings.v4_0.leiaute_nfe_v4_00.TinfRespTec, NoneType] + # or typing.Union[ForwardRef('Tnfe.InfNfe.Det.Imposto'), NoneType] + # that's why we test if the 1st Union type is a dataclass or a ForwardRef + if attr[1].type == str or ( + not isinstance(attr[1].type.__args__[0], ForwardRef) + and not dataclasses.is_dataclass(attr[1].type.__args__[0]) + ): + # SimpleType + if isinstance(value, Enum): + value = value.value + if fields.get(key) and fields[key].type == "datetime": + if "T" in value: + if tz_datetime.match(value): + old_value = value + value = old_value[:19] + # TODO see python3/pysped/xml_sped/base.py#L692 + value = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") + + self._build_string_not_simple_type(key, vals, value, node) + + else: + if str(attr[1].type).startswith("typing.List") or "ForwardRef" in str( + attr[1].type + ): # o2m + binding_type = attr[1].type.__args__[0].__forward_arg__ + else: + binding_type = attr[1].type.__args__[0].__name__ + + # ComplexType + if fields.get(key) and fields[key].related: + if fields[key].readonly and fields[key].type == "many2one": + return False # ex: don't import NFe infRespTec + # example: company.nfe40_enderEmit related on partner_id + # then we need to set partner_id, not nfe40_enderEmit + if isinstance(fields[key].related, list): + key = fields[key].related[-1] # -1 works with _inherits + else: + key = fields[key].related + comodel_name = fields[key].comodel_name + else: + clean_type = binding_type.lower() +<<<<<<< HEAD + comodel_name = "%s.%s.%s" % ( + self._schema_name, + self._schema_version.replace(".", "")[0:2], +||||||| constructed merge base + comodel_name = "{}.{}.{}".format( + self._schema_name, + self._schema_version.replace(".", "")[0:2], +======= + comodel_name = "{}.{}.{}".format( + self._context["spec_schema"], + self._context["spec_version"].replace(".", "")[0:2], +>>>>>>> [REF] spec_driven_model: multi-schemas support + clean_type.split(".")[-1], + ) + + comodel = self._get_concrete_model(comodel_name) + if comodel is None: # example skip ICMS100 class + return + if str(attr[1].type).startswith("typing.List"): + # o2m + lines = [] + for line in [li for li in value if li]: + line_vals = comodel.build_attrs( + line, path=child_path, defaults_model=comodel + ) + lines.append((0, 0, line_vals)) + vals[key] = lines + else: + # m2o + comodel_vals = comodel.build_attrs(value, path=child_path) + child_defaults = self._extract_related_values(vals, key) + + comodel_vals.update(child_defaults) + # FIXME comodel._build_many2one + self._build_many2one( + comodel, vals, comodel_vals, key, value, child_path + ) + + @api.model + def _build_string_not_simple_type(self, key, vals, value, node): + vals[key] = value + + @api.model + def _build_many2one(self, comodel, vals, comodel_vals, key, value, path): + if comodel._name == self._name: + # stacked m2o + vals.update(comodel_vals) + else: + vals[key] = comodel.match_or_create_m2o(comodel_vals, vals) + + @api.model + def _extract_related_values(self, vals, key): + """ + Example: prepare nfe40_enderEmit partner legal_name and name + by reading nfe40_xNome and nfe40_xFant on nfe40_emit + """ + key_vals = {} + for k, v in self._fields.items(): + if ( + hasattr(v, "related") + and hasattr(v.related, "__len__") + and len(v.related) == 2 + and v.related[0] == key + and vals.get(k) is not None + ): + key_vals[v.related[1]] = vals[k] + return key_vals + + @api.model + def _prepare_import_dict( + self, vals, model=None, parent_dict=None, defaults_model=False + ): + """ + Set non computed field values based on XML values if required. + NOTE: this is debatable if we could use an api multi with values in + self instead of the vals dict. Then that would be like when new() + is used in account_invoice or sale_order before playing some onchanges + """ + if model is None: + model = self + + vals = {k: v for k, v in vals.items() if k in self._fields.keys()} + + related_many2ones = {} + fields = model._fields + field_prefix = f"{self._spec_prefix(self._context)}_" + for k, v in fields.items(): + # select schema choices for a friendly UI: +<<<<<<< HEAD + if k.startswith("%schoice" % (self._field_prefix,)): +||||||| constructed merge base + if k.startswith(f"{self._field_prefix}choice"): +======= + if k.startswith(f"{field_prefix}choice"): +>>>>>>> [REF] spec_driven_model: multi-schemas support + for item in v.selection or []: + if vals.get(item[0]) not in [None, []]: + vals[k] = item[0] + break + + # reverse map related fields as much as possible + elif v.related is not None and vals.get(k) is not None: + if not hasattr(v, "__len__"): + related = v.related.split(".") + else: + related = v.related + if len(related) == 1: + vals[related[0]] = vals.get(k) + elif len(related) == 2 and k.startswith(field_prefix): + related_m2o = related[0] + # don't mess with _inherits write system + if not any(related_m2o == i[1] for i in model._inherits.items()): + key_vals = related_many2ones.get(related_m2o, {}) + key_vals[related[1]] = vals.get(k) + related_many2ones[related_m2o] = key_vals + + # now we deal with the related m2o with compound related + # (example: create Nfe lines product) + for related_m2o, sub_val in related_many2ones.items(): + comodel_name = fields[related_m2o].comodel_name + comodel = model._get_concrete_model(comodel_name) + related_many2ones = model._verify_related_many2ones(related_many2ones) + if hasattr(comodel, "match_or_create_m2o"): + vals[related_m2o] = comodel.match_or_create_m2o(sub_val, vals) + else: # search res.country with Brasil for instance + vals[related_m2o] = model.match_or_create_m2o(sub_val, vals, comodel) + + if defaults_model is not None: + defaults = defaults_model.with_context( + record_dict=vals, + parent_dict=parent_dict, + ).default_get( + [ + f + for f, v in defaults_model._fields.items() + if v.type not in ["binary", "integer", "float", "monetary"] + and v.name not in vals.keys() + ] + ) + vals.update(defaults) + # NOTE: also eventually load default values from the context? + return vals + + @api.model + def _verify_related_many2ones(self, related_many2ones): + return related_many2ones + + @api.model + def match_record(self, rec_dict, parent_dict, model=None): + """ + Inspired from match_* methods from + https://github.com/OCA/edi/blob/11.0/base_business_document_import + /models/business_document_import.py + """ + if model is None: + model = self + default_key = [model._rec_name or "name"] + search_keys = "_%s_search_keys" % (self._context["spec_schema"]) + if hasattr(model, search_keys): + keys = getattr(model, search_keys) + default_key + else: + keys = [model._rec_name or "name"] + keys = self._get_aditional_keys(model, rec_dict, keys) + for key in keys: + if rec_dict.get(key): + # TODO enable to build criteria using parent_dict + # such as state_id when searching for a city + if hasattr(model, "_nfe_extra_domain"): # FIXME make generic + domain = model._nfe_extra_domain + [(key, "=", rec_dict.get(key))] + else: + domain = [(key, "=", rec_dict.get(key))] + match_ids = model.search(domain) + if match_ids: + if len(match_ids) > 1: + _logger.warning( + "!! WARNING more than 1 record found!! model: %s, domain: %s" + % (model, domain) + ) + return match_ids[0].id + return False + + @api.model + def _get_aditional_keys(self, model, rec_dict, keys): + return keys + + @api.model + def match_or_create_m2o(self, rec_dict, parent_dict, model=None): + """ + Often the parent_dict can be used to refine the search. + Passing the model makes it possible to override without inheriting + from this mixin. + """ + # TODO log things in chatter like in base_business_document_import + if model is None: + model = self + if hasattr(model, "_match_record"): + rec_id = model.match_record(rec_dict, parent_dict, model) + else: + rec_id = self.match_record(rec_dict, parent_dict, model) + if not rec_id: + vals = self._prepare_import_dict( + rec_dict, model=model, parent_dict=parent_dict, defaults_model=model + ) + if self._context.get("dry_run"): + rec = model.new(vals) + rec_id = rec.id + # at this point for NewId records, some fields + # may need to be set calling the inverse field functions: + for fname in vals: + field = model._fields.get(fname) + if isinstance(field.inverse, str): + getattr(rec, field.inverse)() + rec.write(vals) # ensure vals values aren't overriden + elif ( + field.inverse + and len(inspect.getfullargspec(field.inverse).args) < 2 + ): + field.inverse() + rec.write(vals) + else: + rec_id = ( + model.with_context( + parent_dict=parent_dict, + lang="en_US", + ) + .create(vals) + .id + ) + return rec_id diff --git a/spec_driven_model/models/spec_mixin.py.orig b/spec_driven_model/models/spec_mixin.py.orig new file mode 100644 index 000000000000..7a60c7344c6a --- /dev/null +++ b/spec_driven_model/models/spec_mixin.py.orig @@ -0,0 +1,251 @@ +# Copyright 2019-TODAY Akretion - Raphael Valyi +# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html). + +from importlib import import_module + +from odoo import api, models + +from .spec_models import SPEC_MIXIN_MAPPINGS, SpecModel, StackedModel + + +class SpecMixin(models.AbstractModel): + """ + This is the root "spec" mixin that will be injected dynamically as the parent + of your custom schema mixin (such as spec.mixin.nfe) without the need that + your spec mixin depend on this mixin and on the spec_driven_model module directly + (loose coupling). + This root mixin is typically injected via the _build_model method from SpecModel + or StackedModel that you will be using to inject some spec mixins into + existing Odoo objects. spec.mixin provides generic utility methods such as a + _register_hook, import and export methods. + """ + + _description = "root abstract model meant for xsd generated fiscal models" + _name = "spec.mixin" + _inherit = ["spec.mixin_export", "spec.mixin_import"] + _is_spec_driven = True + + def _valid_field_parameter(self, field, name): + if name in ( + "xsd_type", + "xsd_required", + "choice", + "xsd_choice_required", + "xsd_implicit", + ): + return True + else: + return super()._valid_field_parameter(field, name) + + @api.model + def _get_concrete_model(self, model_name): + "Lookup for concrete models where abstract schema mixins were injected" + if SPEC_MIXIN_MAPPINGS[self.env.cr.dbname].get(model_name) is not None: + return self.env[SPEC_MIXIN_MAPPINGS[self.env.cr.dbname].get(model_name)] + else: + return self.env.get(model_name) + + def _spec_prefix(self, split=False): + """ + Get spec_schema and spec_version from context or from class module + """ + if self._context.get("spec_schema") and self._context.get("spec_version"): + spec_schema = self._context.get("spec_schema") + spec_version = self._context.get("spec_version") + if spec_schema and spec_version: + spec_version = spec_version.replace(".", "")[:2] + if split: + return spec_schema, spec_version + return f"{spec_schema}{spec_version}" + + for ancestor in type(self).mro(): + if not ancestor.__module__.startswith("odoo.addons."): + continue + mod = import_module(".".join(ancestor.__module__.split(".")[:-1])) + if hasattr(mod, "spec_schema"): + spec_schema = mod.spec_schema + spec_version = mod.spec_version.replace(".", "")[:2] + if split: + return spec_schema, spec_version + return f"{spec_schema}{spec_version}" + + return None, None if split else None + + def _register_hook(self): + """ + Called once all modules are loaded. + Here we take all spec models that are not injected into existing concrete + Odoo models and we make them concrete automatically with + their _auto_init method that will create their SQL DDL structure. + """ + res = super()._register_hook() +<<<<<<< HEAD + if not hasattr(self, "_spec_module"): + return res + + load_key = "_%s_loaded" % (self._spec_module,) + if hasattr(self.env.registry, load_key): # already done for registry +||||||| constructed merge base + if "spec_schema" not in self._context: + return res + spec_module = self._get_spec_property("odoo_module") + if not spec_module: + return res + odoo_module = spec_module.split("_spec.")[0].split(".")[-1] + load_key = f"_{spec_module}_loaded" + if hasattr(self.env.registry, load_key): # already done for registry +======= + spec_schema, spec_version = self._spec_prefix(split=True) + if not spec_schema: + return res + + spec_module = self._get_spec_property("odoo_module") + odoo_module = spec_module.split("_spec.")[0].split(".")[-1] + load_key = f"_{spec_module}_loaded" + if hasattr(self.env.registry, load_key): # hook already done for registry +>>>>>>> [FIX] spec_driven_model: register_hook when no ctx + return res + setattr(self.env.registry, load_key, True) + + access_data = [] + access_fields = [] +<<<<<<< HEAD +||||||| constructed merge base + relation_prefix = ( + f"{self._context['spec_schema']}.{self._context['spec_version']}.%" + ) + field_prefix = f"{self._context['spec_schema']}{self._context['spec_version']}_" +======= + field_prefix = f"{spec_schema}{spec_version}" + relation_prefix = f"{spec_schema}.{spec_version}.%" +>>>>>>> [FIX] spec_driven_model: register_hook when no ctx + self.env.cr.execute( + """SELECT DISTINCT relation FROM ir_model_fields + WHERE relation LIKE %s;""", + (f"{self._schema_name}.{self._schema_version.replace('.', '')[:2]}.%",), + ) + # now we will filter only the spec models not injected into some existing class: + remaining_models = { + i[0] + for i in self.env.cr.fetchall() + if self.env.registry.get(i[0]) + and not SPEC_MIXIN_MAPPINGS[self.env.cr.dbname].get(i[0]) + } + for name in remaining_models: + spec_class = StackedModel._odoo_name_to_class(name, self._spec_module) + if spec_class is None: + continue + fields = self.env[spec_class._name].fields_get_keys() + rec_name = next( + filter( + lambda x: ( + x.startswith(self.env[spec_class._name]._field_prefix) + and "_choice" not in x + ), + fields, + ) + ) + model_type = type( + name, + (SpecModel, spec_class), + { + "_name": name, + "_inherit": spec_class._inherit, + "_original_module": "fiscal", + "_odoo_module": self._odoo_module, + "_spec_module": self._spec_module, + "_rec_name": rec_name, + "_module": self._odoo_module, + }, + ) +<<<<<<< HEAD + model_type._schema_name = self._schema_name + model_type._schema_version = self._schema_version + models.MetaModel.module_to_models[self._odoo_module] += [model_type] +||||||| constructed merge base + # we set _spec_schema and _spec_version because + # _build_model will not have context access: + model_type._spec_schema = self._context["spec_schema"] + model_type._spec_version = self._context["spec_version"] + models.MetaModel.module_to_models[odoo_module] += [model_type] +======= + # we set _spec_schema and _spec_version because + # _build_model will not have context access: + model_type._spec_schema = spec_schema + model_type._spec_version = spec_version + models.MetaModel.module_to_models[odoo_module] += [model_type] +>>>>>>> [FIX] spec_driven_model: register_hook when no ctx + + # now we init these models properly + # a bit like odoo.modules.loading#load_module_graph would do + model = model_type._build_model(self.env.registry, self.env.cr) + + self.env[name]._prepare_setup() + self.env[name]._setup_base() + self.env[name]._setup_fields() + self.env[name]._setup_complete() + + access_fields = [ + "id", + "name", + "model_id/id", + "group_id/id", + "perm_read", + "perm_write", + "perm_create", + "perm_unlink", + ] + model._auto_fill_access_data(self.env, self._odoo_module, access_data) + + self.env["ir.model.access"].load(access_fields, access_data) + self.env.registry.init_models( + self.env.cr, remaining_models, {"module": self._odoo_module} + ) + return res + + @classmethod + def _auto_fill_access_data(cls, env, module_name: str, access_data: list): + """ + Fill access_data with a default user and a default manager access. + """ + + underline_name = cls._name.replace(".", "_") + model_id = f"{module_name}_spec.model_{underline_name}" + user_access_name = f"access_{underline_name}_user" + if not env["ir.model.access"].search( + [ + ("name", "in", [underline_name, user_access_name]), + ("model_id", "=", model_id), + ] + ): + access_data.append( + [ + user_access_name, + user_access_name, + model_id, + f"{module_name}.group_user", + "1", + "0", + "0", + "0", + ] + ) + manager_access_name = f"access_{underline_name}_manager" + if not env["ir.model.access"].search( + [ + ("name", "in", [underline_name, manager_access_name]), + ("model_id", "=", model_id), + ] + ): + access_data.append( + [ + manager_access_name, + manager_access_name, + model_id, + f"{module_name}.group_manager", + "1", + "1", + "1", + "1", + ] + ) diff --git a/spec_driven_model/models/spec_models.py.orig b/spec_driven_model/models/spec_models.py.orig new file mode 100644 index 000000000000..6105dea9f5d8 --- /dev/null +++ b/spec_driven_model/models/spec_models.py.orig @@ -0,0 +1,368 @@ +# Copyright 2019-TODAY Akretion - Raphael Valyi +# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html). + +import logging +import sys +from collections import OrderedDict, defaultdict +from importlib import import_module +from inspect import getmembers, isclass + +from odoo import SUPERUSER_ID, _, api, models +from odoo.tools import mute_logger + +SPEC_MIXIN_MAPPINGS = defaultdict(dict) # by db + +_logger = logging.getLogger(__name__) + + +class SelectionMuteLogger(mute_logger): + """ + The following fields.Selection warnings seem both very hard to + avoid and benign in the spec_driven_model framework context. + All in all, muting these 2 warnings seems like the best option. + """ + + def filter(self, record): + msg = record.getMessage() + if ( + "selection attribute will be ignored" in msg + or "overrides existing selection" in msg + ): + return 0 + return super().filter(record) + + +class SpecModel(models.Model): + """When you inherit this Model, then your model becomes concrete just like + models.Model and it can use _inherit to inherit from several xsd generated + spec mixins. + All your model relational fields will be automatically mutated according to + which concrete models the spec mixins where injected in. + Because of this field mutation logic in _build_model, SpecModel should be + inherited the Python way YourModel(spec_models.SpecModel) + and not through _inherit. + """ + + _inherit = "spec.mixin" + _auto = True # automatically create database backend + _register = False # not visible in ORM registry + _abstract = False + _transient = False + + # TODO generic onchange method that check spec field simple type formats + # xsd_required, according to the considered object context + # and return warning or reformat things + # ideally the list of onchange fields is set dynamically but if it is too + # hard, we can just dump the list of fields when SpecModel is loaded + + # TODO a save python constraint that ensuire xsd_required fields for the + # context are present + + @api.depends(lambda self: (self._rec_name,) if self._rec_name else ()) + def _compute_display_name(self): + "More user friendly when automatic _rec_name is bad" + res = super()._compute_display_name() + for rec in self: + if rec.display_name == "False" or not rec.display_name: + rec.display_name = _("Abrir...") + return res + + + @classmethod + def _spec_prefix(cls, context=None, spec_schema=None, spec_version=None): + if context and context.get("spec_schema"): + spec_schema = context.get("spec_schema") + if context and context.get("spec_version"): + spec_version = context.get("spec_version") + return "%s%s" % (spec_schema, spec_version.replace(".", "")[:2]) + + def _get_spec_property(self, spec_property="", fallback=None): + return getattr( + self, f"_{self._spec_prefix(self._context)}_{spec_property}", fallback + ) + + def _get_stacking_points(self): + return self._get_spec_property("stacking_points", {}) + + @classmethod + def _build_model(cls, pool, cr): + """ + xsd generated spec mixins do not need to depend on this opinionated + module. That's why the spec.mixin is dynamically injected as a parent + class as long as the generated spec mixins inherit from some + spec.mixin. mixin. + """ + schema = None + if hasattr(cls, "_schema_name"): + schema = cls._schema_name + elif pool.get(cls._name) and hasattr(pool[cls._name], "_schema_name"): + schema = pool[cls._name]._schema_name + if schema and "spec.mixin" not in [ + c._name for c in pool[f"spec.mixin.{schema}"].__bases__ + ]: + spec_mixin = pool[f"spec.mixin.{schema}"] + spec_mixin._inherit = list(spec_mixin._inherit) + ["spec.mixin"] + spec_mixin._BaseModel__base_classes = ( + pool["spec.mixin"], + ) + spec_mixin._BaseModel__base_classes + spec_mixin.__bases__ = (pool["spec.mixin"],) + spec_mixin.__bases__ + + parents = [ + item[0] if isinstance(item, list) else item for item in list(cls._inherit) + ] + for parent in parents: + # this will register that the spec mixins where injected in this class + if not hasattr(pool[parent], "_is_spec_driven"): + continue + cls._map_concrete(cr.dbname, parent, cls._name) + return super()._build_model(pool, cr) + + @api.model + def _setup_base(self): + with SelectionMuteLogger("odoo.fields"): # mute spurious warnings + return super()._setup_base() + + @api.model + def _setup_fields(self): + """ + SpecModel models inherit their fields from XSD generated mixins. + These mixins can either be made concrete, either be injected into + existing concrete Odoo models. In that last case, the comodels of the + relational fields pointing to such mixins should be remapped to the + proper concrete models where these mixins are injected. + """ + cls = self.env.registry[self._name] + for klass in cls.__bases__: + if not hasattr(klass, "_is_spec_driven"): + continue + if klass._name != cls._name: + cls._map_concrete(self.env.cr.dbname, klass._name, cls._name) + klass._table = cls._table + + stacked_parents = [getattr(x, "_name", None) for x in cls.mro()] + for name, field in cls._fields.items(): + if hasattr(field, "comodel_name") and field.comodel_name: + comodel_name = field.comodel_name + comodel = self.env[comodel_name] + concrete_class = SPEC_MIXIN_MAPPINGS[self.env.cr.dbname].get( + comodel._name + ) + + if ( + field.type == "many2one" + and concrete_class is not None + and comodel_name not in stacked_parents + ): + _logger.debug( + " MUTATING m2o %s (%s) -> %s", + name, + comodel_name, + concrete_class, + ) + field.original_comodel_name = comodel_name + field.comodel_name = concrete_class + + elif field.type == "one2many": + if concrete_class is not None: + _logger.debug( + " MUTATING o2m %s (%s) -> %s", + name, + comodel_name, + concrete_class, + ) + field.original_comodel_name = comodel_name + field.comodel_name = concrete_class + if not hasattr(field, "inverse_name"): + continue + inv_name = field.inverse_name + for n, f in comodel._fields.items(): + if n == inv_name and f.args and f.args.get("comodel_name"): + _logger.debug( + " MUTATING m2o %s.%s (%s) -> %s", + comodel._name.split(".")[-1], + n, + f.args["comodel_name"], + cls._name, + ) + f.args["original_comodel_name"] = f.args["comodel_name"] + f.args["comodel_name"] = self._name + + return super()._setup_fields() + + @classmethod + def _map_concrete(cls, dbname, key, target, quiet=False): + # TODO bookkeep according to a key to allow multiple injection contexts + if not quiet: + _logger.debug("%s ---> %s" % (key, target)) + global SPEC_MIXIN_MAPPINGS + SPEC_MIXIN_MAPPINGS[dbname][key] = target + + @classmethod + def spec_module_classes(cls, spec_module): + """ + Cache the list of spec_module classes to save calls to + slow reflection API. + """ + spec_module_attr = "_spec_cache_%s" % (spec_module.replace(".", "_"),) + if not hasattr(cls, spec_module_attr): + setattr( + cls, spec_module_attr, getmembers(sys.modules[spec_module], isclass) + ) + return getattr(cls, spec_module_attr) + + @classmethod + def _odoo_name_to_class(cls, odoo_name, spec_module): + for _name, base_class in cls.spec_module_classes(spec_module): + if base_class._name == odoo_name: + return base_class + return None + + +class StackedModel(SpecModel): + """ + XML structures are typically deeply nested as this helps xsd + validation. However, deeply nested objects in Odoo suck because that would + mean crazy joins accross many tables and also an endless cascade of form + popups. + + By inheriting from StackModel instead, your models.Model can + instead inherit all the mixins that would correspond to the nested xsd + nodes starting from the stacking_mixin. stacking_skip_paths allows you to avoid + stacking specific nodes while stacking_force_paths will stack many2one + entities even if they are not required. + + In Brazil it allows us to have mostly the fiscal + document objects and the fiscal document line object with many details + stacked in a denormalized way inside these two tables only. + Because StackedModel has its _build_method overriden to do some magic + during module loading it should be inherited the Python way + with MyModel(spec_models.StackedModel). + """ + + _register = False # forces you to inherit StackeModel properly + + @classmethod + def _build_model(cls, pool, cr): + mod = import_module(".".join(cls.__module__.split(".")[:-1])) + if hasattr(cls, "_schema_name"): + schema = cls._schema_name + version = cls._schema_version.replace(".", "")[:2] + else: + mod = import_module(".".join(cls.__module__.split(".")[:-1])) + schema = mod.spec_schema + version = mod.spec_version.replace(".", "")[:2] + spec_prefix = cls._spec_prefix(spec_schema=schema, spec_version=version) + setattr(cls, f"_{spec_prefix}_stacking_points", {}) + stacking_settings = { + "odoo_module": getattr(cls, f"_{spec_prefix}_odoo_module"), # TODO inherit? + "stacking_mixin": getattr(cls, f"_{spec_prefix}_stacking_mixin"), + "stacking_points": getattr(cls, f"_{spec_prefix}_stacking_points"), + "stacking_skip_paths": getattr( + cls, f"_{spec_prefix}_stacking_skip_paths", [] + ), + "stacking_force_paths": getattr( + cls, f"_{spec_prefix}_stacking_force_paths", [] + ), + } + # inject all stacked m2o as inherited classes + _logger.info("building StackedModel %s %s" % (cls._name, cls)) + node = cls._odoo_name_to_class( + stacking_settings["stacking_mixin"], stacking_settings["odoo_module"] + ) + env = api.Environment(cr, SUPERUSER_ID, {}) + for kind, klass, _path, _field_path, _child_concrete in cls._visit_stack( + env, node, stacking_settings + ): + if kind == "stacked" and klass not in cls.__bases__: + cls.__bases__ = (klass,) + cls.__bases__ + return super()._build_model(pool, cr) + + @api.model + def _add_field(self, name, field): + """ + Overriden to avoid adding many2one fields that are in fact "stacking points" + """ + if field.type == "many2one": + for cls in type(self).mro(): + if issubclass(cls, StackedModel): + for attr in dir(cls): + if attr != "_get_stacking_points" and attr.endswith( + "_stacking_points" + ): + if name in getattr(cls, attr).keys(): + return + return super()._add_field(name, field) + + @classmethod + def _visit_stack(cls, env, node, stacking_settings, path=None): + """Pre-order traversal of the stacked models tree. + 1. This method is used to dynamically inherit all the spec models + stacked together from an XML hierarchy. + 2. It is also useful to generate an automatic view of the spec fields. + 3. Finally it is used when exporting as XML. + """ + # We are removing the description of the node + # to avoid translations error + # https://github.com/OCA/l10n-brazil/pull/1272#issuecomment-821806603 + node._description = None + if path is None: + path = stacking_settings["stacking_mixin"].split(".")[-1] + SpecModel._map_concrete(env.cr.dbname, node._name, cls._name, quiet=True) + yield "stacked", node, path, None, None + + fields = OrderedDict() + # this is required when you don't start odoo with -i (update) + # otherwise the model spec will not have its fields loaded yet. + # TODO we may pass this env further instead of re-creating it. + # TODO move setup_base just before the _visit_stack next call + if node._name != cls._name or len(env[node._name]._fields.items() == 0): + env[node._name]._prepare_setup() + env[node._name]._setup_base() + + field_items = [(k, f) for k, f in env[node._name]._fields.items()] + for i in field_items: + fields[i[0]] = { + "type": i[1].type, + # TODO get with a function (lambda?) + "comodel_name": i[1].comodel_name, + "xsd_required": hasattr(i[1], "xsd_required") and i[1].xsd_required, + "xsd_choice_required": hasattr(i[1], "xsd_choice_required") + and i[1].xsd_choice_required, + } + for name, f in fields.items(): + if f["type"] not in [ + "many2one", + "one2many", + ] or name in stacking_settings.get("stacking_skip_paths", ""): + # TODO change for view or export + continue + child = cls._odoo_name_to_class( + f["comodel_name"], stacking_settings["odoo_module"] + ) + if child is None: # Not a spec field + continue + child_concrete = SPEC_MIXIN_MAPPINGS[env.cr.dbname].get(child._name) + field_path = name.split("_")[1] # remove schema prefix + + if f["type"] == "one2many": + yield "one2many", node, path, field_path, child_concrete + continue + + force_stacked = any( + stack_path in path + "." + field_path + for stack_path in stacking_settings.get("stacking_force_paths", []) + ) + + # many2one + if (child_concrete is None or child_concrete == cls._name) and ( + f["xsd_required"] or f["xsd_choice_required"] or force_stacked + ): + # then we will STACK the child in the current class + child._stack_path = path + child_path = "%s.%s" % (path, field_path) + stacking_settings["stacking_points"][name] = env[ + node._name + ]._fields.get(name) + yield from cls._visit_stack(env, child, stacking_settings, child_path) + else: + yield "many2one", node, path, field_path, child_concrete