Skip to content

Commit

Permalink
TP2000-1428: Fix quota origin update transaction bug + real edits (#1312
Browse files Browse the repository at this point in the history
)

* Make real edits in workbasket

* Fix data kwarg error

* Use real edits for exclusions

* Move test
  • Loading branch information
eadpearce authored Nov 5, 2024
1 parent ce8eac5 commit a661050
Show file tree
Hide file tree
Showing 4 changed files with 587 additions and 39 deletions.
205 changes: 204 additions & 1 deletion common/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
from common.tests import models
from common.tests.util import Dates
from common.tests.util import wrap_numbers_over_max_digits
from common.validators import UpdateType
from geo_areas.models import GeographicalArea
from geo_areas.models import GeographicalAreaDescription
from geo_areas.validators import AreaCode
from workbaskets.validators import WorkflowStatus

pytestmark = pytest.mark.django_db

Expand All @@ -27,7 +32,7 @@
"port": 1234,
"dbname": "dbname",
},
"engine://username:password@host:1234/dbname",
"engine://username:password@host:1234/dbname", # /PS-IGNORE
),
(
{
Expand Down Expand Up @@ -379,3 +384,201 @@ def test_xml_fromstring_vulnerabilities(file_name):
string = open(file).read()
with pytest.raises((XMLSyntaxError, DTDForbidden)):
util.xml_fromstring(string)


def test_make_real_edit_create(date_ranges):
workbasket = factories.WorkBasketFactory.create(
status=WorkflowStatus.EDITING,
)
tx1 = workbasket.new_transaction()

data = {
"area_id": "AAA",
"area_code": AreaCode.COUNTRY,
"valid_between": date_ranges.normal,
}
new_geo_area = util.make_real_edit(
tx=tx1,
cls=GeographicalArea,
obj=None,
data=data,
workbasket=workbasket,
update_type=UpdateType.CREATE,
)
data = {
"described_geographicalarea": new_geo_area,
"description": "Lorem ipsum",
"validity_start": date_ranges.normal.lower,
}
util.make_real_edit(
tx=tx1,
cls=GeographicalAreaDescription,
obj=None,
data=data,
workbasket=workbasket,
update_type=UpdateType.CREATE,
)

# geo area and description created
assert workbasket.tracked_models.count() == 2
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalAreaDescription).count() == 1
)
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.CREATE
)
assert (
workbasket.tracked_models.instance_of(GeographicalAreaDescription)
.first()
.update_type
== UpdateType.CREATE
)


def test_make_real_edit_update_edit():
# preexisting geo area
geo_area = factories.GeographicalAreaFactory.create(area_id="ABC")

workbasket = factories.WorkBasketFactory.create(
status=WorkflowStatus.EDITING,
)

assert workbasket.tracked_models.count() == 0

tx1 = workbasket.new_transaction()

data = {"area_id": "AAA"}
geo_area_update = util.make_real_edit(
tx=tx1,
cls=GeographicalArea,
obj=geo_area,
data=data,
workbasket=workbasket,
update_type=UpdateType.UPDATE,
)

assert geo_area_update.area_id == "AAA"

assert workbasket.tracked_models.count() == 1
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.UPDATE
)

# edit again
tx = workbasket.new_transaction()

data = {"area_id": "BBB"}
geo_area_update2 = util.make_real_edit(
tx=tx,
cls=GeographicalArea,
obj=geo_area_update,
data=data,
workbasket=workbasket,
update_type=UpdateType.UPDATE,
)

assert geo_area_update2.area_id == "BBB"

assert workbasket.tracked_models.count() == 1
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.UPDATE
)


def test_make_real_edit_update_delete():
# preexisting geo area
geo_area = factories.GeographicalAreaFactory.create(area_id="ABC")

workbasket = factories.WorkBasketFactory.create(
status=WorkflowStatus.EDITING,
)

assert workbasket.tracked_models.count() == 0

tx1 = workbasket.new_transaction()

data = {"area_id": "AAA"}
geo_area_update = util.make_real_edit(
tx=tx1,
cls=GeographicalArea,
obj=geo_area,
data=data,
workbasket=workbasket,
update_type=UpdateType.UPDATE,
)

assert geo_area_update.area_id == "AAA"

assert workbasket.tracked_models.count() == 1
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.UPDATE
)

tx2 = workbasket.new_transaction()

geo_area = workbasket.tracked_models.instance_of(GeographicalArea).first()
deleted_geo_area = util.make_real_edit(
tx=tx2,
cls=GeographicalArea,
obj=geo_area,
data={},
workbasket=workbasket,
update_type=UpdateType.DELETE,
)
assert deleted_geo_area == None
assert workbasket.tracked_models.count() == 1
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.DELETE
)


def test_make_real_edit_create_delete():
workbasket = factories.WorkBasketFactory.create(
status=WorkflowStatus.EDITING,
)
tx1 = workbasket.new_transaction()

with tx1:
geo_area = factories.GeographicalAreaFactory.create(area_id="ABC")

# geo area and description created
assert workbasket.tracked_models.count() == 2
assert workbasket.tracked_models.instance_of(GeographicalArea).count() == 1
assert (
workbasket.tracked_models.instance_of(GeographicalAreaDescription).count() == 1
)
assert (
workbasket.tracked_models.instance_of(GeographicalArea).first().update_type
== UpdateType.CREATE
)
assert (
workbasket.tracked_models.instance_of(GeographicalAreaDescription)
.first()
.update_type
== UpdateType.CREATE
)

tx2 = workbasket.new_transaction()

geo_area = workbasket.tracked_models.instance_of(GeographicalArea).first()
deleted_geo_area = util.make_real_edit(
tx=tx2,
cls=GeographicalArea,
obj=geo_area,
data={},
workbasket=workbasket,
update_type=UpdateType.DELETE,
)
# since the FK to geo area on description has on_delete=models.CASCADE this will delete the description as well
assert deleted_geo_area == None
assert workbasket.tracked_models.count() == 0
67 changes: 67 additions & 0 deletions common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from platform import python_version_tuple
from typing import IO
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Sequence
Expand Down Expand Up @@ -52,6 +53,8 @@
from psycopg.types.range import DateRange
from psycopg.types.range import TimestampRange

from common.validators import UpdateType

major, minor, patch = python_version_tuple()


Expand Down Expand Up @@ -694,3 +697,67 @@ def wrapper(wrapped, instance, args, kwargs):
return result

return wrapper


def make_real_edit(
tx: "common.models.Transaction",
cls: Callable,
obj: Optional["common.models.TrackedModel"],
data: dict,
workbasket: "workbasket.models.WorkBasket",
update_type: UpdateType,
):
"""
Checks whether an object exists in the workbasket and makes the appropriate
update/create/delete.
update_type == CREATE:
* New model is created in the workbasket
update_type == UPDATE:
* nothing in workbasket: New UPDATE version created in workbasket
* with CREATE/UPDATE in workbasket: Updates the trackedmodel in the workbasket. No new version is created
update_type == DELETE:
* nothing in workbasket: New DELETE version created in workbasket
* with UPDATE in workbasket: Update type of current change in workbasket is changed to DELETE
* with CREATE in workbasket: Removes the trackedmodel from the workbasket
"""

data = data or {}

if update_type == UpdateType.CREATE:
new_obj = cls(transaction=tx, update_type=update_type, **data)
new_obj.save()
return new_obj

workbasket_objects = workbasket.tracked_models.instance_of(cls)

if not obj:
raise ValueError("If update_type is not CREATE obj param is required.")

if not workbasket_objects.filter(pk=obj.pk).exists():
# if it's not already in the workbasket, create a new version
new_version = cls.objects.get(pk=obj.pk).new_version(
workbasket=workbasket,
transaction=tx,
update_type=update_type,
**data,
)
return new_version

else:
if update_type == UpdateType.UPDATE:
cls.objects.filter(pk=obj.pk).update(transaction=tx, **data)
return cls.objects.get(pk=obj.pk)

elif update_type == UpdateType.DELETE:
# we're deleting the object we just created. remove it from the workbasket
if obj.update_type == UpdateType.CREATE:
obj.delete()

# we're now deleting an object instead of updating it. change the update type
elif obj.update_type == UpdateType.UPDATE:
cls.objects.filter(pk=obj.pk).update(update_type=UpdateType.DELETE)

return None
Loading

0 comments on commit a661050

Please sign in to comment.