Skip to content

Commit

Permalink
oca-port: blacklist PR(s) 3335, 3345 for spec_driven_model
Browse files Browse the repository at this point in the history
  • Loading branch information
rvalyi committed Nov 29, 2024
1 parent e0840ca commit 38eb9ce
Show file tree
Hide file tree
Showing 7 changed files with 1,359 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .oca/oca-port/blacklist/spec_driven_model.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"pull_requests": {
"OCA/l10n-brazil#3335": "done in #3442",
"OCA/l10n-brazil#3345": "dotfiles not ported to 15.0 yet"
}
}
84 changes: 84 additions & 0 deletions l10n_br_account_nfe/tests/test_nfce_contingency.py.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Copyright 2023 KMEE (Felipe Zago Rodrigues <[email protected]>)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).

from odoo.tests import TransactionCase


class TestAccountNFCeContingency(TransactionCase):
<<<<<<< HEAD
@classmethod
def setUpClass(cls):
super().setUpClass()
# this hook is required to test l10n_br_account_nfe alone:
cls.env["spec.mixin.nfe"]._register_hook()
cls.document_id = cls.env.ref("l10n_br_nfe.demo_nfce_same_state")
cls.prepare_account_move_nfce()
||||||| constructed merge base
def setUp(self):
super().setUp()
# this hook is required to test l10n_br_account_nfe alone:
self.env["spec.mixin.nfe"]._register_hook()
self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state")
self.prepare_account_move_nfce()
=======
def setUp(self):
super().setUp()
self.document_id = self.env.ref("l10n_br_nfe.demo_nfce_same_state")
self.prepare_account_move_nfce()
>>>>>>> [REM] l10n_br_account_nfe: drop _register_hook

@classmethod
def prepare_account_move_nfce(cls):
receivable_account_id = cls.env["account.account"].create(
{
"name": "TEST ACCOUNT",
"code": "01.1.1.2.2",
"reconcile": 1,
"company_id": cls.env.ref("base.main_company").id,
"user_type_id": cls.env.ref("account.data_account_type_receivable").id,
}
)
payable_account_id = cls.env["account.account"].create(
{
"name": "TEST ACCOUNT 2",
"code": "01.1.1.2.3",
"reconcile": 1,
"company_id": cls.env.ref("base.main_company").id,
"user_type_id": cls.env.ref("account.data_account_type_payable").id,
}
)
payment_method = cls.env.ref("account.account_payment_method_manual_in").id
journal_id = cls.env["account.journal"].create(
{
"name": "JOURNAL TEST",
"code": "TEST",
"type": "bank",
"company_id": cls.env.ref("base.main_company").id,
}
)
payment_mode = cls.env["account.payment.mode"].create(
{
"name": "PAYMENT MODE TEST",
"company_id": cls.env.ref("base.main_company").id,
"payment_method_id": payment_method,
"fiscal_payment_mode": "15",
"bank_account_link": "fixed",
"fixed_journal_id": journal_id.id,
}
)
cls.document_move_id = cls.env["account.move"].create(
{
"name": "MOVE TEST",
"payment_mode_id": payment_mode.id,
"company_id": cls.env.ref("base.main_company").id,
"line_ids": [
(0, 0, {"account_id": receivable_account_id.id, "credit": 10}),
(0, 0, {"account_id": payable_account_id.id, "debit": 10}),
],
}
)
cls.document_move_id.fiscal_document_id = cls.document_id.id

def test_nfce_contingencia(self):
self.document_id._update_nfce_for_offline_contingency()
self.assertIn(self.document_move_id, self.document_id.move_ids)
54 changes: 54 additions & 0 deletions l10n_br_nfe/hooks.py.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (C) 2019-2020 - Raphael Valyi Akretion
# License AGPL-3 - See http://www.gnu.org/licenses/agpl-3.0.html
import logging

import nfelib
import pkg_resources
from nfelib.nfe.bindings.v4_0.leiaute_nfe_v4_00 import TnfeProc

from odoo import SUPERUSER_ID, api
from odoo.exceptions import ValidationError

from odoo.addons.spec_driven_model import hooks

_logger = logging.getLogger(__name__)


def post_init_hook(cr, registry):
env = api.Environment(cr, SUPERUSER_ID, {})
<<<<<<< HEAD
hooks.register_hook(
env, "l10n_br_nfe", "odoo.addons.l10n_br_nfe_spec.models.v4_0.leiaute_nfe_v4_00"
)

||||||| constructed merge base
env["nfe.40.infnfe"]._register_hook()
=======
>>>>>>> [REF] l10n_br_nfe: further multi-schemas
cr.execute("select demo from ir_module_module where name='l10n_br_nfe';")
is_demo = cr.fetchone()[0]
if is_demo:
res_items = (
"nfe",
"samples",
"v4_0",
"leiauteNFe",
"35180834128745000152550010000474491454651420-nfe.xml",
)
resource_path = "/".join(res_items)
nfe_stream = pkg_resources.resource_stream(nfelib.__name__, resource_path)
binding = TnfeProc.from_xml(nfe_stream.read().decode())
document_number = binding.NFe.infNFe.ide.nNF
existing_nfes = env["l10n_br_fiscal.document"].search(
[("document_number", "=", document_number)]
)
try:
existing_nfes.unlink()
nfe = (
env["nfe.40.infnfe"]
.with_context(tracking_disable=True, edoc_type="in")
.build_from_binding("nfe", "40", binding.NFe.infNFe)
)
_logger.info(nfe.nfe40_emit.nfe40_CNPJ)
except ValidationError:
_logger.info("NF-e already %s imported by hooks" % (document_number,))
232 changes: 232 additions & 0 deletions spec_driven_model/models/spec_export.py.orig
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# Copyright 2019 KMEE
# License LGPL-3.0 or later (https://www.gnu.org/licenses/lgpl-3.0.en.html).

import logging
import sys

from odoo import api, fields, models

_logger = logging.getLogger(__name__)


class SpecMixinExport(models.AbstractModel):
_name = "spec.mixin_export"
_description = "a mixin providing serialization features"

@api.model
def _get_binding_class(self, class_obj):
binding_module = sys.modules[self._get_spec_property("binding_module")]
for attr in class_obj._binding_type.split("."):
binding_module = getattr(binding_module, attr)
return binding_module

@api.model
def _get_model_classes(self):
classes = [getattr(x, "_name", None) for x in type(self).mro()]
return classes

@api.model
def _get_spec_classes(self, classes=False):
if not classes:
classes = self._get_model_classes()
spec_classes = []
for c in set(classes):
if c is None:
continue
<<<<<<< HEAD
if not c.startswith("%s." % (self._schema_name,)):
||||||| constructed merge base
if not c.startswith(f"{self._schema_name}."):
=======
if not c.startswith(f"{self._context['spec_schema']}."):
>>>>>>> [REF] spec_driven_model: further multi-schemas
continue
# the following filter to fields to show
# when several XSD class are injected in the same object
if self._context.get("spec_class") and c != self._context["spec_class"]:
continue
spec_classes.append(c)
return spec_classes

def _export_fields(self, xsd_fields, class_obj, export_dict):
"""
Iterate over the record fields and map them in an dict of values
that will later be injected as **kwargs in the proper XML Python
binding constructors. Hence the value can either be simple values or
sub binding instances already properly instanciated.

This method implements a dynamic dispatch checking if there is any
method called _export_fields_CLASS_NAME to update the xsd_fields
and export_dict variables, this way we allow controlling the
flow of fields to export or injecting specific values in the
field export.
"""
self.ensure_one()
binding_class = self._get_binding_class(class_obj)
binding_class_spec = binding_class.__dataclass_fields__

class_name = class_obj._name.replace(".", "_")
export_method_name = "_export_fields_%s" % class_name
if hasattr(self, export_method_name):
xsd_fields = [i for i in xsd_fields]
export_method = getattr(self, export_method_name)
export_method(xsd_fields, class_obj, export_dict)

for xsd_field in xsd_fields:
if not xsd_field:
continue
if (
not self._fields.get(xsd_field)
) and xsd_field not in self._get_stacking_points().keys():
continue
field_spec_name = xsd_field.split("_")[1] # remove schema prefix
field_spec = False
for fname, fspec in binding_class_spec.items():
if fspec.metadata.get("name", {}) == field_spec_name:
field_spec_name = fname
if field_spec_name == fname:
field_spec = fspec
if field_spec and not field_spec.init:
# case of xsd fixed values, we should not try to write them
continue

if not binding_class_spec.get(field_spec_name):
# this can happen with a o2m generated foreign key for instance
continue
field_spec = binding_class_spec[field_spec_name]
field_data = self._export_field(
xsd_field, class_obj, field_spec, export_dict.get(field_spec_name)
)
if xsd_field in self._get_stacking_points().keys():
if not field_data:
# stacked nested tags are skipped if empty
continue
elif not self[xsd_field] and not field_data:
continue

export_dict[field_spec_name] = field_data

def _export_field(self, xsd_field, class_obj, field_spec, export_value=None):
"""
Map a single Odoo field to a python binding value according to the
kind of field.
"""
self.ensure_one()
# TODO: Export number required fields with Zero.
field = class_obj._fields.get(
xsd_field, self._get_stacking_points().get(xsd_field)
)
xsd_required = field.xsd_required if hasattr(field, "xsd_required") else None
xsd_type = field.xsd_type if hasattr(field, "xsd_type") else None
if field.type == "many2one":
if (not self._get_stacking_points().get(xsd_field)) and (
not self[xsd_field] and not xsd_required
):
if field.comodel_name not in self._get_spec_classes():
return False
if hasattr(field, "xsd_choice_required"):
xsd_required = True
return self._export_many2one(xsd_field, xsd_required, class_obj)
elif self._fields[xsd_field].type == "one2many":
return self._export_one2many(xsd_field, class_obj)
elif self._fields[xsd_field].type == "datetime" and self[xsd_field]:
return self._export_datetime(xsd_field)
elif self._fields[xsd_field].type == "date" and self[xsd_field]:
return self._export_date(xsd_field)
elif (
self._fields[xsd_field].type in ("float", "monetary")
and self[xsd_field] is not False
):
if hasattr(field, "xsd_choice_required"):
xsd_required = True
return self._export_float_monetary(
xsd_field, xsd_type, class_obj, xsd_required, export_value
)
elif type(self[xsd_field]) is str:
return self[xsd_field].strip()
else:
return self[xsd_field]

def _export_many2one(self, field_name, xsd_required, class_obj=None):
self.ensure_one()
if field_name in self._get_stacking_points().keys():
return self._build_binding(
class_name=self._get_stacking_points()[field_name].comodel_name
)
else:
return (self[field_name] or self)._build_binding(
class_name=class_obj._fields[field_name].comodel_name
)

def _export_one2many(self, field_name, class_obj=None):
self.ensure_one()
relational_data = []
for relational_field in self[field_name]:
field_data = relational_field._build_binding(
class_name=class_obj._fields[field_name].comodel_name
)
relational_data.append(field_data)
return relational_data

def _export_float_monetary(
self, field_name, xsd_type, class_obj, xsd_required, export_value=None
):
self.ensure_one()
field_data = export_value or self[field_name]
# TODO check xsd_required for all fields to export?
if not field_data and not xsd_required:
return False
if xsd_type and xsd_type.startswith("TDec"):
tdec = "".join(filter(lambda x: x.isdigit(), xsd_type))[-2:]
else:
tdec = ""
my_format = "%.{}f".format(tdec)
return str(my_format % field_data)

def _export_date(self, field_name):
self.ensure_one()
return str(self[field_name])

def _export_datetime(self, field_name):
self.ensure_one()
return str(
fields.Datetime.context_timestamp(
self, fields.Datetime.from_string(self[field_name])
).isoformat("T")
)

def _build_binding(self, spec_schema=None, spec_version=None, class_name=None):
"""
Iterate over an Odoo record and its m2o and o2m sub-records
using a pre-order tree traversal and map the Odoo record values
to a dict of Python binding values.

These values will later be injected as **kwargs in the proper XML Python
binding constructors. Hence the value can either be simple values or
sub binding instances already properly instanciated.
"""
self.ensure_one()
if spec_schema and spec_version:
self = self.with_context(spec_schema=spec_schema, spec_version=spec_version)
self.env[f"spec.mixin.{spec_schema}"]._register_hook()
if not class_name:
class_name = self._get_spec_property("stacking_mixin", self._name)

class_obj = self.env[class_name]

xsd_fields = (
i
for i in class_obj._fields
if class_obj._fields[i].name.startswith(f"{self._spec_prefix()}_")
and "_choice" not in class_obj._fields[i].name
)

kwargs = {}
binding_class = self._get_binding_class(class_obj)
self._export_fields(xsd_fields, class_obj, export_dict=kwargs)
sliced_kwargs = {
key: kwargs.get(key)
for key in binding_class.__dataclass_fields__.keys()
if kwargs.get(key)
}
return binding_class(**sliced_kwargs)
Loading

0 comments on commit 38eb9ce

Please sign in to comment.