Skip to content

Commit

Permalink
Integrate Price Prediction Model for PoE Items (#15)
Browse files Browse the repository at this point in the history
* feat: fetch item price history data for price prediction

- add schema to parse fetched data into

* feat: predict future item prices

* chore: clean up price prediction script

* chore: parse Item's `price` field from json

* chore: update requirements

* feat: complete initial price prediction script

- add functions to get, map and bulk update db data
- bulk update items in db for efficiency
- use price api endpoint based on category type
- prevent error on receiving empty dataset in price predictor function
- add main function to handle the process

* chore: perform price prediction in batches

* chore: schedule price prediction job

* chore: cleanup the script

* fix: parse incoming historical prices as Decimals

* fix: change script runner function's logger message
  • Loading branch information
dhruv-ahuja authored Jun 22, 2024
1 parent 55e31f6 commit d283d9e
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 5 deletions.
14 changes: 14 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ click==8.1.7
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.3.0
contourpy==1.2.1
coverage==7.3.3
cryptography==42.0.4
cycler==0.12.1
dnspython==2.6.1
ecdsa==0.18.0
email-validator==2.1.0.post1
fastapi==0.109.2
fonttools==4.51.0
gunicorn==22.0.0
h11==0.14.0
hiredis==2.3.2
Expand All @@ -31,9 +34,12 @@ httpx==0.26.0
idna==3.7
iniconfig==2.0.0
jmespath==1.0.1
joblib==1.4.2
kiwisolver==1.4.5
kombu==5.3.4
lazy-model==0.2.0
loguru==0.7.2
memory-profiler==0.61.0
motor==3.3.1
mypy-boto3-cloudformation==1.28.83
mypy-boto3-cloudwatch==1.28.36
Expand All @@ -45,11 +51,15 @@ mypy-boto3-rds==1.28.84
mypy-boto3-s3==1.28.55
mypy-boto3-sqs==1.28.82
newrelic==9.3.0
numpy==1.26.4
orjson==3.9.15
packaging==23.2
passlib==1.7.4
patsy==0.5.6
pillow==10.3.0
pluggy==1.3.0
prompt-toolkit==3.0.41
psutil==5.9.8
pyasn1==0.5.1
pycparser==2.21
pycurl==7.45.2
Expand All @@ -58,6 +68,7 @@ pydantic-settings==2.1.0
pydantic_core==2.14.1
pyfakefs==5.3.2
pymongo==4.6.3
pyparsing==3.1.2
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
Expand All @@ -72,9 +83,12 @@ redis==5.0.1
rsa==4.9
ruff==0.1.5
s3transfer==0.7.0
scikit-learn==1.5.0
scipy==1.13.1
six==1.16.0
sniffio==1.3.0
starlette==0.36.3
threadpoolctl==3.5.0
toml==0.10.2
types-awscrt==0.19.10
types-s3transfer==0.7.0
Expand Down
7 changes: 4 additions & 3 deletions src/config/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,13 @@ async def setup_services(app_: FastAPI) -> t.AsyncGenerator[None, t.Any]:
else:
logger.info("skipped scheduling, s3 connection was skipped")

delete_older_than = dt.datetime.utcnow() - dt.timedelta(days=1)
jobs.schedule_tokens_deletion(delete_older_than, async_scheduler)

scheduler.start()
async_scheduler.start()

delete_older_than = dt.datetime.utcnow() - dt.timedelta(days=1)
jobs.schedule_tokens_deletion(delete_older_than, async_scheduler)
jobs.schedule_price_prediction_run(async_scheduler)

# inject services into global app state
app_.state.queue = queue
app_.state.bucket = s3_bucket
Expand Down
5 changes: 3 additions & 2 deletions src/models/poe.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum

from beanie import Link
from pydantic import Field
from pydantic import Field, Json

from src.models.common import DateMetadataDocument
from src.schemas.poe import ItemPrice
Expand Down Expand Up @@ -30,11 +30,12 @@ class Item(DateMetadataDocument):
"""Item represents a Path of Exile in-game item. Each item belongs to a category. It contains information such as
item type and the current, past and predicted pricing, encapsulated in the `ItemPrice` schema."""

# TODO: ensure type_ field is serialized as 'type'
poe_ninja_id: int
id_type: ItemIdType | None = None
name: str
category: Link[ItemCategory]
price: ItemPrice | None = None
price: Json[ItemPrice] | None = None
type_: str | None = Field(None, serialization_alias="type")
variant: str | None = None
icon_url: str | None = None
Expand Down
265 changes: 265 additions & 0 deletions src/scripts/price_prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import asyncio
import datetime as dt
from decimal import Decimal
import time
from typing import Annotated

from httpx import AsyncClient, HTTPError
from loguru import logger
import motor
import motor.motor_asyncio
from numpy import ndarray
import numpy
import pandas as pd
from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter
import pydantic
import pymongo
import statsmodels.api as sm
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LinearRegression

from src.config.services import connect_to_mongodb
from src.models import document_models
from src.models.poe import Item, ItemCategory
from src.schemas.poe import Currency, ItemPrice


API_BASE_URL = "https://poe.ninja/api/data"

BATCH_SIZE = 500


# schema logic
def round_off_price(value: float):
return round(Decimal(value), 2)


class PriceHistoryEntity(BaseModel):
count: int
value: Annotated[Decimal, BeforeValidator(round_off_price)]
days_ago: int = Field(alias="daysAgo")

def convert_days_ago_to_date(self):
now = dt.datetime.now(dt.UTC)
return now - dt.timedelta(self.days_ago)


async def get_items(offset: int) -> list[Item]:
"""Gets all Items from the database."""

try:
# avoiding links here as each object will fetch its own category record
return await Item.find_all(skip=offset, limit=BATCH_SIZE).to_list()
except Exception as exc:
logger.error(f"error getting items with offset {offset}: {exc}")
raise


async def get_and_map_categories() -> dict[str, ItemCategory]:
"""Gets and maps category model instances to their Ids."""

try:
categories = await ItemCategory.find_all().to_list()
except Exception as exc:
logger.error(f"error getting item categories: {exc}")
raise

category_map = {str(category.id): category for category in categories}
return category_map


async def update_items_data(items: list[Item], iteration_count: int) -> None:
"""Bulk-updates item data in the database. Serializes item price schema into a JSON object for insertion into
the database. Creates an order of Pymongo-native `UpdateOne` operations and bulk writes them for efficiency over
inserting each record one-by-one."""

bulk_operations = []
item_collection: motor.motor_asyncio.AsyncIOMotorCollection = Item.get_motor_collection()

for item in items:
item_price_json = item.price.model_dump_json() if item.price else None

bulk_operations.append(
pymongo.UpdateOne(
{"_id": item.id},
{
"$set": {"price": item_price_json},
},
)
)

try:
result = await item_collection.bulk_write(bulk_operations)
logger.info(f"result from batch number {iteration_count}'s bulk update:", result)
except Exception as exc:
logger.error(f"error bulk writing: {exc}")
logger.error(f"{type(exc)}")


async def get_price_history_data(category_internal_name: str, item_id: int) -> list[PriceHistoryEntity]:
"""Gets all available price history data for the given item_id, and parses it into a consistent schema model."""

if category_internal_name in ("Currency", "Fragment"):
url = f"/currencyhistory?league=Necropolis&type={category_internal_name}&currencyId={item_id}"
else:
url = f"/itemhistory?league=Necropolis&type={category_internal_name}&itemId={item_id}"

async with AsyncClient(base_url=API_BASE_URL) as client:
try:
response = await client.get(url)
response.raise_for_status()
price_history_api_data: list[dict] | dict[str, list[dict]] = response.json()
except HTTPError as exc:
logger.error(
f"error getting price history data for item_id {item_id} belonging to '{category_internal_name}' category: {exc}"
)
return []

try:
if isinstance(price_history_api_data, dict):
price_history_api_data = price_history_api_data.pop("receiveCurrencyGraphData")
ta = TypeAdapter(list[PriceHistoryEntity])
price_history_data = ta.validate_python(price_history_api_data)
except pydantic.ValidationError as exc:
logger.error(
f"error parsing price history data for item_id {item_id} belonging to '{category_internal_name}' category: {exc}"
)
return []

return price_history_data


def predict_future_item_prices(price_history_data: list[PriceHistoryEntity], days: int = 4) -> ndarray:
"""Predicts future item prices based on the last 30 days' prices, predicting potential value for the next given
number of days."""

if len(price_history_data) < 1:
return numpy.empty(0)

df = pd.DataFrame([entity.model_dump() for entity in price_history_data])

# Filter and keep only the last 30 days' data
df = df.tail(30)
df["index"] = range(1, len(df) + 1)

# Set the independent variables (X) and add a constant term
# NOTE: integrating count (extra variable) would require a more complex model; linear regression takes in 1 value
X = df[["days_ago"]]
X = sm.add_constant(X)

# Set the dependent variable (Y)
Y = df["value"]

# Set up PolynomialFeatures and fit_transform X
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(X.drop(columns=["const"])) # type: ignore

# Fit polynomial regression model
model_poly = LinearRegression()
model_poly.fit(X_poly, Y)

# Prepare future data for predictions
future_days_ago = [df["days_ago"].min() + i for i in range(1, days + 1)]
future_count = df["count"].mean()

future_data = pd.DataFrame({"days_ago": future_days_ago, "count": [future_count] * len(future_days_ago)})
future_data = pd.DataFrame({"days_ago": future_days_ago})

future_data_with_const = sm.add_constant(future_data, has_constant="add") # Add constant term for predictions

# Transform future data for polynomial regression
future_X_poly = poly.transform(future_data_with_const.drop(columns=["const"])) # type: ignore

# Predict future values using both models
predictions = model_poly.predict(future_X_poly)
return predictions


async def main():
offset = iteration_count = 0

await connect_to_mongodb(document_models)

start = time.perf_counter()
category_map = await get_and_map_categories()

item_price_history_data = []
item_price_prediction_data = []

total_items = await Item.count()

while offset < total_items:
batch_start = time.perf_counter()
items = await get_items(offset)

for item in items:
item_category_id = str(item.category.ref.id)
try:
item_category = category_map[item_category_id]
except KeyError:
logger.error(f"item category not found for '{item.name}' item with category id: {item_category_id}")
continue

price_history_data = await get_price_history_data(item_category.internal_name, item.poe_ninja_id)

item_price_history_data.append(price_history_data)

price_predictions = predict_future_item_prices(price_history_data)
item_price_prediction_data.append(price_predictions)

await add_item_price_data(items, price_history_data, item_price_prediction_data)

await update_items_data(items, iteration_count)
batch_stop = time.perf_counter()

logger.info(
f"time taken for price predictions for batch {iteration_count + 1} of items: {batch_stop - batch_start}"
)

offset += BATCH_SIZE
iteration_count += 1

stop = time.perf_counter()
logger.info(f"total time taken for predicting prices of {total_items}: {stop - start}")


async def add_item_price_data(
items: list[Item], price_history_data: list[PriceHistoryEntity], item_price_prediction_data: list[ndarray]
):
"""Converts historical price data into the desired format and adds it and current, future price data to respective Item records."""

now = dt.datetime.now(dt.UTC)

for item, price_prediction_data in zip(items, item_price_prediction_data):
if len(price_history_data) < 1:
continue

price_prediction_mapping = {}
price_history_mapping = {}

for index, value in enumerate(price_prediction_data):
rounded_value = round(Decimal(value), 2)
future_date = now + dt.timedelta(index + 1)

price_prediction_mapping[future_date] = rounded_value

last_week_price_data = price_history_data[-7:]
todays_price_data = price_history_data[-1]

for entity in last_week_price_data:
previous_date = entity.convert_days_ago_to_date()
price_history_mapping[previous_date] = entity.value

item_price_data = ItemPrice(
price=todays_price_data.value,
currency=Currency.chaos,
price_history=price_history_mapping,
price_history_currency=Currency.chaos,
price_prediction=price_prediction_mapping,
price_prediction_currency=Currency.chaos,
)
item.price = item_price_data


if __name__ == "__main__":
asyncio.run(main())
Loading

0 comments on commit d283d9e

Please sign in to comment.