Skip to content

Commit

Permalink
test code
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Aug 30, 2023
1 parent f9f96b1 commit 3bbb8b1
Show file tree
Hide file tree
Showing 9 changed files with 798 additions and 0 deletions.
23 changes: 23 additions & 0 deletions .dlt/.pipelines
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
engine_version: 1
pipelines:
chess:
is_dirty: false
last_commit_sha: 6792c713c2a0343e468f89792a58850195bf9528
last_commit_timestamp: '2023-06-05T14:09:46+02:00'
files:
chess/__init__.py:
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528
git_sha: 66ccbbdf9e59e8bcb0b932a80eb033a298c3e9f5
sha3_256: fe7d5923ae62f7bba3b6aa0447af9f7c187f719636e62410d3d0c0c193abed26
chess/README.md:
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528
git_sha: 4d2c6988018890d8834b32874887f59d3c4ef4a1
sha3_256: 7c0de2dd5788615f1a18f5b1018ad0013070b82204a780becb8706869447a1c8
chess/settings.py:
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528
git_sha: e7c335449972e4e4de5ee1035114571f135fd9cf
sha3_256: 01728e594272c541af519f58ebb3ef1b13bcd9253fafc8cfdea532449f96d017
chess/helpers.py:
commit_sha: 6792c713c2a0343e468f89792a58850195bf9528
git_sha: c607c5172afbf52578ec1563cc5eb45f4c209cfd
sha3_256: ed62520d9d09960ce88719306d5f080876435b6425808c5587a6078eaa093538
9 changes: 9 additions & 0 deletions .dlt/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# put your configuration values here

[runtime]
log_level="WARNING" # the system log level of dlt
# use the dlthub_telemetry setting to enable/disable anonymous usage data reporting, see https://dlthub.com/docs/telemetry
dlthub_telemetry = true

[sources.chess]
config_int = 0 # please set me up!
75 changes: 75 additions & 0 deletions chess/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
title: Chess.com
description: dlt pipeline for Chess.com API
keywords: [chess.com api, chess.com pipeline, chess.com]
---

# Chess.com

This pipeline can be used to load player data from the [Chess.com API](https://www.chess.com/news/view/published-data-api) into a [destination](../general-usage/glossary.md#destination) of your choice.

## Initialize the pipeline

Initialize the pipeline with the following command:
```
dlt init chess bigquery
```
Here, we chose BigQuery as the destination. To choose a different destination, replace `bigquery` with your choice of destination.

Running this command will create a directory with the following structure:
```bash
├── .dlt
│ ├── .pipelines
│ ├── config.toml
│ └── secrets.toml
├── chess
│ └── __pycache__
│ └── __init__.py
├── .gitignore
├── chess_pipeline.py
└── requirements.txt
```

## Add credentials

Before running the pipeline you may need to add credentials in the `.dlt/secrets.toml` file for your chosen destination. For instructions on how to do this, follow the steps detailed under the desired destination in the [destinations](https://dlthub.com/docs/destinations) page.

## Run the pipeline

1. Install the necessary dependencies by running the following command:
```
pip install -r requirements.txt
```
2. Now the pipeline can be run by using the command:
```
python3 chess_pipeline.py
```
3. To make sure that everything is loaded as expected, use the command:
```
dlt pipeline chess_pipeline show
```

## Customize parameters

Without any modifications, the chess pipeline will load data for a default list of players over a default period of time. You can change these values in the `chess_pipeline.py` script.

For example, if you wish to load player games for a specific set of players, add the player list to the function `load_player_games_example` as below.
```python
def load_players_games_example(start_month: str, end_month: str):

pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='bigquery', dataset_name="chess_players_games_data")

data = chess(
[], # Specify your list of players here
start_month=start_month,
end_month=end_month
)

info = pipeline.run(data.with_resources("players_games", "players_profiles"))
print(info)
```
To specify the time period, pass the starting and ending months as parameters when calling the function in the `__main__` block:
```python
if __name__ == "__main__" :
load_players_games_example("2022/11", "2022/12") # Replace the strings "2022/11" and "2022/12" with different months in the "YYYY/MM" format
```
166 changes: 166 additions & 0 deletions chess/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""A pipeline loading player profiles and games from chess.com api"""

from typing import Callable, Iterator, List, Sequence, Dict, Any

import dlt
from dlt.common import pendulum
from dlt.common.typing import TDataItem, StrAny
from dlt.extract.source import DltResource
from dlt.sources.helpers import requests
from .helpers import get_url_with_retry, get_path_with_retry, validate_month_string

from .settings import UNOFFICIAL_CHESS_API_URL


@dlt.source(name="chess")
def source(
players: List[str], start_month: str = None, end_month: str = None
) -> Sequence[DltResource]:
"""
A dlt source for the chess.com api. It groups several resources (in this case chess.com API endpoints) containing
various types of data: user profiles or chess match results
Args:
players (List[str]): A list of the player usernames for which to get the data.
start_month (str, optional): Filters out all the matches happening before `start_month`. Defaults to None.
end_month (str, optional): Filters out all the matches happening after `end_month`. Defaults to None.
Returns:
Sequence[DltResource]: A sequence of resources that can be selected from including players_profiles,
players_archives, players_games, players_online_status
"""
return (
players_profiles(players),
players_archives(players),
players_games(players, start_month=start_month, end_month=end_month),
players_online_status(players),
)


@dlt.resource(write_disposition="replace")
def players_profiles(players: List[str]) -> Iterator[TDataItem]:
"""
Yields player profiles for a list of player usernames.
Args:
players (List[str]): List of player usernames to retrieve profiles for.
Yields:
Iterator[TDataItem]: An iterator over player profiles data.
"""

# get archives in parallel by decorating the http request with defer
@dlt.defer
def _get_profile(username: str) -> TDataItem:
return get_path_with_retry(f"player/{username}")

for username in players:
yield _get_profile(username)


@dlt.resource(write_disposition="replace", selected=False)
def players_archives(players: List[str]) -> Iterator[List[TDataItem]]:
"""
Yields url to game archives for specified players.
Args:
players (List[str]): List of player usernames to retrieve archives for.
Yields:
Iterator[List[TDataItem]]: An iterator over list of player archive data.
"""
for username in players:
data = get_path_with_retry(f"player/{username}/games/archives")
yield data.get("archives", [])


@dlt.resource(write_disposition="append")
def players_games(
players: List[str], start_month: str = None, end_month: str = None
) -> Iterator[Callable[[], List[TDataItem]]]:
"""
Yields `players` games that happened between `start_month` and `end_month`.
Args:
players (List[str]): List of player usernames to retrieve games for.
start_month (str, optional): The starting month in the format "YYYY/MM". Defaults to None.
end_month (str, optional): The ending month in the format "YYYY/MM". Defaults to None.
Yields:
Iterator[Callable[[], List[TDataItem]]]: An iterator over callables that return a list of games for each player.
""" # do a simple validation to prevent common mistakes in month format
validate_month_string(start_month)
validate_month_string(end_month)

# get a list of already checked archives
# from your point of view, the state is python dictionary that will have the same content the next time this function is called
checked_archives = dlt.current.resource_state().setdefault("archives", [])
# get player archives, note that you can call the resource like any other function and just iterate it like a list
archives = players_archives(players)

# get archives in parallel by decorating the http request with defer
@dlt.defer
def _get_archive(url: str) -> List[TDataItem]:
print(f"Getting archive from {url}")
try:
games = get_url_with_retry(url).get("games", [])
return games # type: ignore
except requests.HTTPError as http_err:
# sometimes archives are not available and the error seems to be permanent
if http_err.response.status_code == 404:
return []
raise

# enumerate the archives
url: str = None
for url in archives:
# the `url` format is https://api.chess.com/pub/player/{username}/games/{YYYY}/{MM}
if start_month and url[-7:] < start_month:
continue
if end_month and url[-7:] > end_month:
continue
# do not download archive again
if url in checked_archives:
continue
else:
checked_archives.append(url)
# get the filtered archive
yield _get_archive(url)


@dlt.resource(write_disposition="append")
def players_online_status(players: List[str]) -> Iterator[TDataItem]:
"""
Returns current online status for a list of players.
Args:
players (List[str]): List of player usernames to check online status for.
Yields:
Iterator[TDataItem]: An iterator over the online status of each player.
"""
# we'll use unofficial endpoint to get online status, the official seems to be removed
for player in players:
status = get_url_with_retry(
"%suser/popup/%s" % (UNOFFICIAL_CHESS_API_URL, player)
)
# return just relevant selection
yield {
"username": player,
"onlineStatus": status["onlineStatus"],
"lastLoginDate": status["lastLoginDate"],
"check_time": pendulum.now(), # dlt can deal with native python dates
}


@dlt.source
def chess_dlt_config_example(
secret_str: str = dlt.secrets.value,
secret_dict: Dict[str, Any] = dlt.secrets.value,
config_int: int = dlt.config.value,
) -> DltResource:
"""
An example of a source that uses dlt to provide secrets and config values.
Args:
secret_str (str, optional): Secret string provided by dlt.secrets.value. Defaults to dlt.secrets.value.
secret_dict (Dict[str, Any], optional): Secret dictionary provided by dlt.secrets.value. Defaults to dlt.secrets.value.
config_int (int, optional): Config integer provided by dlt.config.value. Defaults to dlt.config.value.
Returns:
DltResource: Returns a resource yielding the configured values.
"""
print(secret_str)
print(secret_dict)
print(config_int)

# returns a resource yielding the configured values - it is just a test
return dlt.resource([secret_str, secret_dict, config_int], name="config_values")
20 changes: 20 additions & 0 deletions chess/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Chess pipeline helpers"""

from dlt.common.typing import StrAny
from dlt.sources.helpers import requests
from .settings import OFFICIAL_CHESS_API_URL


def get_url_with_retry(url: str) -> StrAny:
r = requests.get(url)
return r.json() # type: ignore


def get_path_with_retry(path: str) -> StrAny:
return get_url_with_retry(f"{OFFICIAL_CHESS_API_URL}{path}")


def validate_month_string(string: str) -> None:
"""Validates that the string is in YYYY/MM format"""
if string and string[4] != "/":
raise ValueError(string)
4 changes: 4 additions & 0 deletions chess/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Chess pipeline settings and constants"""

OFFICIAL_CHESS_API_URL = "https://api.chess.com/pub/"
UNOFFICIAL_CHESS_API_URL = "https://www.chess.com/callback/"
52 changes: 52 additions & 0 deletions chess_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import dlt
from chess import source
import os

def load_table_counts(p: dlt.Pipeline, *table_names: str):
"""Returns row counts for `table_names` as dict"""

# try sql, could be other destination though
query = "\nUNION ALL\n".join([f"SELECT '{name}' as name, COUNT(1) as c FROM {name}" for name in table_names])
with p.sql_client() as c:
with c.execute_query(query) as cur:
rows = list(cur.fetchall())
return {r[0]: r[1] for r in rows}


def load_players_games_example(start_month: str, end_month: str) -> None:
"""Constructs a pipeline that will load chess games of specific players for a range of months."""
os.environ['NORMALIZE__SCHEMA_UPDATE_MODE'] = "freeze-and-raise"

# configure the pipeline: provide the destination and dataset name to which the data should go
pipeline = dlt.pipeline(
import_schema_path="schemas/import",
export_schema_path="schemas/export",
pipeline_name="chess_pipeline",
destination='duckdb',
dataset_name="chess_players_games_data",
full_refresh=True
)
# create the data source by providing a list of players and start/end month in YYYY/MM format
data = source(
["magnuscarlsen", "vincentkeymer", "dommarajugukesh", "rpragchess"],
start_month=start_month,
end_month=end_month,
)
# load the "players_games" and "players_profiles" out of all the possible resources
info = pipeline.run(data.with_resources("players_games", "players_profiles"))
print(info)
table_counts = load_table_counts(pipeline, *[t["name"] for t in pipeline.default_schema.data_tables()])
print(table_counts)


def load_players_games_incrementally() -> None:
"""Pipeline will not load the same game archive twice"""
# loads games for 11.2022
load_players_games_example("2022/11", "2022/11")
# second load skips games for 11.2022 but will load for 12.2022
load_players_games_example("2022/11", "2022/12")


if __name__ == "__main__":
# run our main example
load_players_games_example("2022/11", "2022/12")
Loading

0 comments on commit 3bbb8b1

Please sign in to comment.