Skip to content

Commit

Permalink
Update Schema and Data Loading Script for PoE Items (#25)
Browse files Browse the repository at this point in the history
* chore: remove special category filter for items API

* chore: remove link between category and poe items documents

* refactor: improve item price info schema handling

- store price info as-is, skipping json serialization
- aptly convert pricing decimal types when loading or storing data

* fix: remove invalid import from poe script

* fix: avoid using reserved '+' symbol for sort query params

* chore: remove unneeded 400 error response example from items API

* refactor: modify poe items' base and price schema

* chore: add new fields' support in data-loader script

* fix: fix data loader script's errors

- use new league
- remove outdated 'Coffins' category group
- set category's internal name rather than link to category record on item
- make ItemSparkline's fields nullable

* chore: adjust item price schema

* refactor: update data loading script to support new changes

* chore: add serialization method on ItemPrice model

* [wip]refactor: improve price prediction script's performance

- update script to accommodate latest changes
- start integrating message passing using queues for efficiency
- wip: use concurrenct API calls to get price history data for multiple items

* Revert "[wip]refactor: improve price prediction script's performance"

This reverts commit a34fc04.
  • Loading branch information
dhruv-ahuja authored Aug 21, 2024
1 parent 5064d74 commit e9a05ac
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 65 deletions.
4 changes: 1 addition & 3 deletions src/models/poe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from beanie import Link

from src.models.common import DateMetadataDocument
from src.schemas.poe import ItemBase

Expand All @@ -22,7 +20,7 @@ class Item(ItemBase, DateMetadataDocument):
"""Item represents a Path of Exile in-game item. Each item belongs to a category. It contains information such as
item type and the current, past and predicted pricing, encapsulated in the `ItemPrice` schema."""

category: Link[ItemCategory]
category: str | None = None

class Settings:
"""Defines the settings for the collection."""
Expand Down
7 changes: 3 additions & 4 deletions src/routers/poe.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ async def get_all_categories():


@router.get("/items", responses=resp.GET_ITEMS_RESPONSES)
async def get_items_by_group(
category_group: str | None = Query(None, min_length=3, max_length=50),
async def get_items(
pagination: PaginationInput = Depends(), # using Depends allows us to encapsulate Query params within Pydantic models
filter_: list[str] | None = Query(None, alias="filter"),
sort: list[str] | None = Query(None),
):
"""Gets a list of all items belonging to the given category group."""
"""Gets a list of all items, modified by any given parameters."""

filter_sort_input = FilterSortInput(sort=sort, filter=filter_)
items, total_items = await service.get_items(category_group, pagination, filter_sort_input)
items, total_items = await service.get_items(pagination, filter_sort_input)

response = create_pagination_response(items, total_items, pagination, "items")
return AppResponse(response)
59 changes: 49 additions & 10 deletions src/schemas/poe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
import datetime as dt
from decimal import Decimal
from enum import Enum
from typing import TypedDict
from typing import Annotated, TypedDict, cast

from pydantic import BaseModel, Field, Json
from bson import Decimal128
from pydantic import BaseModel, BeforeValidator, Field

from src.utils.jobs import convert_decimal


def convert_decimal_values(values: dict[dt.datetime, str | Decimal128 | Decimal]) -> dict[dt.datetime, Decimal]:
converted_values = {}

for key, value in values.items():
converted_value = value.to_decimal() if isinstance(value, Decimal128) else Decimal(value)
converted_values[key] = Decimal(converted_value)

return converted_values


def convert_current_price(value: Decimal | Decimal128 | str) -> Decimal:
return value.to_decimal() if isinstance(value, Decimal128) else Decimal(value)


class Currency(str, Enum):
Expand All @@ -20,12 +37,33 @@ class ItemPrice(BaseModel):
"""ItemPrice holds information regarding the current, past and future price of an item.
It stores the recent and predicted prices in a dictionary, with the date as the key."""

price: Decimal
currency: Currency
price_history: dict[dt.datetime, Decimal]
price_history_currency: Currency
price_prediction: dict[dt.datetime, Decimal]
price_prediction_currency: Currency
chaos_price: Annotated[Decimal, BeforeValidator(convert_current_price)] = Decimal(0)
divine_price: Annotated[Decimal, BeforeValidator(convert_current_price)] = Decimal(0)
price_history: Annotated[dict[dt.datetime, Decimal], BeforeValidator(convert_decimal_values)] = {}
price_history_currency: Currency = Currency.chaos
price_prediction: Annotated[dict[dt.datetime, Decimal], BeforeValidator(convert_decimal_values)] = {}
price_prediction_currency: Currency = Currency.chaos
low_confidence: bool = False
listings: int = 0

def serialize(self) -> dict:
"""Serializes the object instance's data, making it compatible with MongoDB. Converts Decimal values into
Decimal128 values and datetime keys into string keys."""

price_history = self.price_history
price_prediction = self.price_prediction

serialized_data = self.model_dump()

# convert datetime keys into string variants
serialized_data["price_history"] = {str(k): v for k, v in price_history.items()}
serialized_data["price_prediction"] = {str(k): v for k, v in price_prediction.items()}

# convert decimal types into Decimal128 types and cast the output as dictionary
serialized_data = convert_decimal(serialized_data)
serialized_data = cast(dict, serialized_data)

return serialized_data


class ItemCategoryResponse(BaseModel):
Expand All @@ -42,11 +80,12 @@ class ItemBase(BaseModel):
poe_ninja_id: int
id_type: ItemIdType | None = None
name: str
price: Json[ItemPrice] | None = None
price_info: ItemPrice | None = None
type_: str | None = Field(None, serialization_alias="type")
variant: str | None = None
icon_url: str | None = None
enabled: bool = True
links: int | None = None
# enabled: bool = True


class ItemGroupMapping(TypedDict):
Expand Down
4 changes: 2 additions & 2 deletions src/schemas/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def parse_sort_input(query_params: list[str] | None) -> list["SortSchema"] | Non
if first_char == "-":
operation: SORT_OPERATION = "desc"
field = query_param[1:]
elif first_char == "+" or first_char.isalpha():
elif first_char.isalpha():
operation: SORT_OPERATION = "asc"
field = query_param[1:] if first_char == "+" else query_param
field = query_param
else:
valid_params = False
break
Expand Down
14 changes: 0 additions & 14 deletions src/schemas/web_responses/poe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@
}
}
},
400: {
"content": {
"application/json": {
"example": {
"error": {
"type": "invalid_input",
"message": "Invalid category group.",
"fields": None,
},
}
}
},
"data": None,
},
422: {
"content": {
"application/json": {
Expand Down
55 changes: 49 additions & 6 deletions src/scripts/poe_initial.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from decimal import Decimal
import json
import os
import time
from typing import Any, cast

from httpx import AsyncClient, RequestError
from loguru import logger
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, computed_field
import pydantic

from src.config.services import connect_to_mongodb
from src.models import document_models
from src.models.poe import Item, ItemCategory, ItemIdType
from src.models.poe import Item, ItemCategory
from src.schemas.poe import ItemIdType, ItemPrice


@dataclass
Expand All @@ -30,6 +32,11 @@ class ApiItemData:


# * these encapsulate required currency and item data for each entry from API responses
class ItemSparkline(BaseModel):
data: list[Decimal | None]
totalChange: Decimal | None


class CurrencyItemMetadata(BaseModel):
id_: int = Field(alias="id")
icon: str | None = None
Expand All @@ -38,14 +45,19 @@ class CurrencyItemMetadata(BaseModel):
class CurrencyItemEntity(BaseModel):
class Pay(BaseModel):
pay_currency_id: int
listing_count: int = 0

class Receive(BaseModel):
get_currency_id: int
listing_count: int = 0

currencyTypeName: str
pay: Pay | None = None
receive: Receive | None = None
metadata: CurrencyItemMetadata | None = None
# paySparkLine: ItemSparkline | None = None
# receiveSparkLine: ItemSparkline | None = None
chaosEquivalent: Decimal = Decimal(0)


class ItemEntity(BaseModel):
Expand All @@ -55,20 +67,36 @@ class ItemEntity(BaseModel):
variant: str | None = None
icon: str
itemType: str | None = None
chaosValue: Decimal = Decimal(0)
divineValue: Decimal = Decimal(0)
links: int | None = None
listingCount: int = 0
sparkline: ItemSparkline
lowConfidenceSparkline: ItemSparkline

@computed_field
@property
def low_confidence(self) -> bool:
low_confidence = False

if len(self.sparkline.data) < 3 or self.listingCount < 10 and len(self.lowConfidenceSparkline.data) > 3:
low_confidence = True

return low_confidence


CATEGORY_GROUP_MAP = {
"Currency": [
Category("Currency", "Currency"),
Category("Fragments", "Fragment"),
Category("Coffins", "Coffin"),
Category("Allflame Embers", "AllflameEmber"),
Category("Tattoos", "Tattoo"),
Category("Omens", "Omen"),
Category("Divination Cards", "DivinationCard"),
Category("Artifacts", "Artifact"),
Category("Oils", "Oil"),
Category("Incubators", "Incubator"),
Category("Kalguuran Runes", "KalguuranRune"),
],
"EquipmentAndGems": [
Category("Unique Weapons", "UniqueWeapon"),
Expand Down Expand Up @@ -185,7 +213,8 @@ async def get_item_api_data(internal_category_name: str, client: AsyncClient) ->
endpoint, then parsing and returning the item data for the category."""

api_endpoint = "currencyoverview" if internal_category_name == "Currency" else "itemoverview"
url = f"/{api_endpoint}?league=Necropolis&type={internal_category_name}"
league = "Settlers"
url = f"/{api_endpoint}?league={league}&type={internal_category_name}"

item_data = []
currency_item_metadata = []
Expand Down Expand Up @@ -268,32 +297,46 @@ def prepare_item_record(
if item_entity.pay is not None:
poe_ninja_id = item_entity.pay.pay_currency_id
id_type = ItemIdType.pay
listings = item_entity.pay.listing_count
elif item_entity.receive is not None:
poe_ninja_id = item_entity.receive.get_currency_id
id_type = ItemIdType.receive
listings = item_entity.receive.listing_count
else:
logger.error(f"no pay or get id found for {item_entity.currencyTypeName}, skipping")
return

price_info = ItemPrice(chaos_price=item_entity.chaosEquivalent, listings=listings)

item_metadata = item_entity.metadata
item_record = Item(
poe_ninja_id=poe_ninja_id,
id_type=id_type,
name=item_entity.currencyTypeName,
type_=None,
category=category_record, # type: ignore
category=category_record.internal_name,
icon_url=item_metadata.icon if item_metadata else None,
price_info=price_info,
)

else:
item_entity = cast(ItemEntity, item_entity)

price_info = ItemPrice(
chaos_price=item_entity.chaosValue,
divine_price=item_entity.divineValue,
listings=item_entity.listingCount,
low_confidence=item_entity.low_confidence,
)
item_record = Item(
poe_ninja_id=item_entity.id_,
name=item_entity.name,
type_=item_entity.itemType,
category=category_record, # type: ignore
category=category_record.internal_name,
icon_url=item_entity.icon,
variant=item_entity.variant,
links=item_entity.links,
price_info=price_info,
)

return item_record
Expand Down
30 changes: 4 additions & 26 deletions src/services/poe.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from collections import defaultdict

from fastapi import HTTPException
from loguru import logger
from starlette import status

from src.models.poe import Item, ItemCategory
from src.schemas.poe import ItemBase, ItemGroupMapping, ItemCategoryResponse
Expand Down Expand Up @@ -40,29 +38,11 @@ def group_item_categories(item_categories: list[ItemCategoryResponse]) -> list[I


async def get_items(
category_group: str | None, pagination: PaginationInput, filter_sort_input: FilterSortInput | None
pagination: PaginationInput, filter_sort_input: FilterSortInput | None
) -> tuple[list[ItemBase], int]:
"""
Gets items by given category group, and the total items' count in the database. Raises a 400 error if category
group is invalid.
"""

item_category = None
if category_group is not None:
try:
item_category = await ItemCategory.find_one(ItemCategory.group == category_group)
except Exception as exc:
logger.error(f"error getting item category by group '{category_group}': {exc} ")
raise

if item_category is None:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid category group.")

if item_category is None:
query = Item.find()
else:
query = Item.find(Item.category.group == category_group) # type: ignore
"""Gets items by given category group, and the total items' count in the database."""

query = Item.find()
chainer = QueryChainer(query, Item)

if filter_sort_input is None:
Expand All @@ -88,9 +68,7 @@ async def get_items(
items = await paginated_query
items_count = await count_query
except Exception as exc:
logger.error(
f"error getting items from database category_group:'{category_group}'; filter_sort: {filter_sort_input}: {exc}"
)
logger.error(f"error getting items from database; filter_sort: {filter_sort_input}: {exc}")
raise

return items, items_count
24 changes: 24 additions & 0 deletions src/utils/jobs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import datetime as dt
from decimal import Decimal
import subprocess


from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from bson import Decimal128
from loguru import logger
from mypy_boto3_s3.service_resource import Bucket

Expand Down Expand Up @@ -75,3 +77,25 @@ def schedule_price_prediction_run(scheduler: AsyncIOScheduler) -> Job:
logger.info(f"scheduled '{job_id}' job to run daily")

return job


def convert_decimal(dict_item: dict | None):
"""This function iterates a dictionary looking for types of Decimal and converts them to Decimal128
Embedded dictionaries and lists are called recursively.
See: https://stackoverflow.com/questions/61456784/pymongo-cannot-encode-object-of-type-decimal-decimal"""

if dict_item is None:
return None

# for k,v in list(dict_item.items()):
for key, value in dict_item.items():
if isinstance(value, dict):
convert_decimal(value)
elif isinstance(value, list):
for entry in value:
convert_decimal(entry)
elif isinstance(value, Decimal):
dict_item[key] = Decimal128(str(value))

return dict_item

0 comments on commit e9a05ac

Please sign in to comment.