forked from cosmicpython/code
-
Notifications
You must be signed in to change notification settings - Fork 0
/
repository.py
156 lines (133 loc) · 5.19 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from __future__ import annotations
import abc
import edgedb
import allocation.adapters.pyd_model as model
class SynchronousUpdateError(Exception):
pass
class AbstractRepository(abc.ABC):
def __init__(self) -> None:
self.seen: set[model.Product] = set()
async def add(self, product: model.Product) -> None:
await self._add(product)
self.seen.add(product)
async def get(self, sku: str) -> model.Product | None:
product = await self._get(sku=sku)
if product:
self.seen.add(product)
return product
async def get_by_batchref(self, batchref: str) -> model.Product | None:
product = await self._get_by_batchref(batchref=batchref)
if product:
self.seen.add(product)
return product
@abc.abstractmethod
async def _add(self, product: model.Product) -> None:
raise NotImplementedError
@abc.abstractmethod
async def _get(self, *args, **kwargs) -> model.Product | None:
raise NotImplementedError
@abc.abstractmethod
async def _get_by_batchref(self, *args, **kwargs) -> model.Product | None:
raise NotImplementedError
async def list(self) -> list[model.Batch] | None:
raise NotImplementedError
class EdgeDBRepository(AbstractRepository):
def __init__(self, async_client_db) -> None:
super().__init__()
self.client: edgedb.AsyncIOClient = async_client_db
async def _get(
self,
sku: str | None = None,
batchref: str | None = None,
allocations: bool = True,
) -> model.Product | None:
"""Return Product by SKU."""
obj_ = await self.client.query_single(
f""" SELECT Product {{
sku, version_number,
batches: {{
reference,
sku,
eta,
purchased_quantity,
{"allocations: { orderid, sku, qty }" if allocations else ""}
}}
}}
FILTER .sku ?= <optional str>$sku
OR .batches.reference ?= <optional str>$reference
LIMIT 1
""",
sku=sku,
reference=batchref,
)
return model.Product.model_validate(obj_) if obj_ else None
async def _get_by_batchref(self, batchref: str | None) -> model.Product | None:
return await self._get(batchref=batchref)
async def _add(self, product: model.Product) -> None:
product_db = await self.client.query_single(
"""SELECT Product { version_number } FILTER .sku=<str>$sku""", sku=product.sku
)
if product_db and product_db.version_number >= product.version_number:
raise SynchronousUpdateError(
"could not serialize access due to concurrent update"
)
await self.add_product(product)
self.seen.add(product)
if hasattr(product, "batches"):
if product.batches is not None:
for batch in product.batches:
await self.add_batch(batch)
async def add_product(self, product: model.Product):
data = product.model_dump_json(exclude={"batches"})
await self.client.query(
"""WITH
obj := <json>$data,
INSERT Product {
sku := <str>obj['sku'],
version_number := <int16>obj['version_number'],
}
UNLESS CONFLICT ON .sku ELSE (
UPDATE Product SET {
sku := <str>obj['sku'],
version_number := <int16>obj['version_number'],
}
)
""",
data=data,
)
async def add_batch(self, batch: model.Batch) -> None:
data = batch.model_dump_json()
await self.client.query(
"""WITH
obj := <json>$data,
list_orders := <array<json>>obj['allocations'] ?? [<json>{}],
new_batch := (INSERT Batch {
reference := <str>obj['reference'],
sku := <str>obj['sku'],
eta := <cal::local_date>obj['eta'],
purchased_quantity := <int16>obj['purchased_quantity'],
}
UNLESS CONFLICT ON .reference ELSE (
UPDATE Batch SET {
reference := <str>obj['reference'],
sku := <str>obj['sku'],
eta := <cal::local_date>obj['eta'],
purchased_quantity := <int16>obj['purchased_quantity'],
}
)),
for order_line in array_unpack(list_orders) union (
INSERT OrderLine {
orderid := <str>order_line['orderid'],
qty := <int16>order_line['qty'],
sku := <str>order_line['sku'],
allocated_in := new_batch
}
);
WITH obj := <json>$data,
SELECT Batch FILTER .reference = <str>obj['reference'];
""",
data=data,
)
async def list(self) -> list[model.Batch]:
objects = await self.client.query("""SELECT Batch {**}""")
return [model.Batch.model_validate(obj) for obj in objects]